LCOV - differential code coverage report
Current view: top level - src/backend/commands - async.c (source / functions) Coverage Total Hit UNC LBC UBC GBC GNC CBC ECB DUB DCB
Current: 0e5ff9b9b45a657aea12440478dc002e9b01f138 vs 0123ce131fca454009439dfa3b2266d1d40737d7 Lines: 88.9 % 840 747 28 1 64 29 266 452 2 7 65
Current Date: 2026-03-14 14:10:32 -0400 Functions: 96.4 % 55 53 1 1 1 30 22 4
Baseline: lcov-20260315-024220-baseline Branches: 65.8 % 558 367 62 2 127 17 136 214 25 45
Baseline Date: 2026-03-14 15:27:56 +0100 Line coverage date bins:
Legend: Lines:     hit not hit
Branches: + taken - not taken # not executed
(1,7] days: 37.5 % 8 3 5 3
(30,360] days: 86.0 % 344 296 23 25 263 33
(360..) days: 91.8 % 488 448 1 39 29 419 2
Function coverage date bins:
(1,7] days: 0.0 % 1 0 1
(30,360] days: 100.0 % 14 14 14
(360..) days: 97.5 % 40 39 1 1 16 22
Branch coverage date bins:
(30,360] days: 63.5 % 222 141 62 1 18 136 5
(360..) days: 67.3 % 336 226 1 109 17 209

 Age         Owner                    Branch data    TLA  Line data    Source code
                                  1                 :                : /*-------------------------------------------------------------------------
                                  2                 :                :  *
                                  3                 :                :  * async.c
                                  4                 :                :  *    Asynchronous notification: NOTIFY, LISTEN, UNLISTEN
                                  5                 :                :  *
                                  6                 :                :  * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
                                  7                 :                :  * Portions Copyright (c) 1994, Regents of the University of California
                                  8                 :                :  *
                                  9                 :                :  * IDENTIFICATION
                                 10                 :                :  *    src/backend/commands/async.c
                                 11                 :                :  *
                                 12                 :                :  *-------------------------------------------------------------------------
                                 13                 :                :  */
                                 14                 :                : 
                                 15                 :                : /*-------------------------------------------------------------------------
                                 16                 :                :  * Async Notification Model as of v19:
                                 17                 :                :  *
                                 18                 :                :  * 1. Multiple backends on same machine.  Multiple backends may be listening
                                 19                 :                :  *    on each of several channels.
                                 20                 :                :  *
                                 21                 :                :  * 2. There is one central queue in disk-based storage (directory pg_notify/),
                                 22                 :                :  *    with actively-used pages mapped into shared memory by the slru.c module.
                                 23                 :                :  *    All notification messages are placed in the queue and later read out
                                 24                 :                :  *    by listening backends.  The single queue allows us to guarantee that
                                 25                 :                :  *    notifications are received in commit order.
                                 26                 :                :  *
                                 27                 :                :  *    Although there is only one queue, notifications are treated as being
                                 28                 :                :  *    database-local; this is done by including the sender's database OID
                                 29                 :                :  *    in each notification message.  Listening backends ignore messages
                                 30                 :                :  *    that don't match their database OID.  This is important because it
                                 31                 :                :  *    ensures senders and receivers have the same database encoding and won't
                                 32                 :                :  *    misinterpret non-ASCII text in the channel name or payload string.
                                 33                 :                :  *
                                 34                 :                :  *    Since notifications are not expected to survive database crashes,
                                 35                 :                :  *    we can simply clean out the pg_notify data at any reboot, and there
                                 36                 :                :  *    is no need for WAL support or fsync'ing.
                                 37                 :                :  *
                                 38                 :                :  * 3. Every backend that is listening on at least one channel registers by
                                 39                 :                :  *    entering its PID into the array in AsyncQueueControl. It then scans all
                                 40                 :                :  *    incoming notifications in the central queue and first compares the
                                 41                 :                :  *    database OID of the notification with its own database OID and then
                                 42                 :                :  *    compares the notified channel with the list of channels that it listens
                                 43                 :                :  *    to. In case there is a match it delivers the notification event to its
                                 44                 :                :  *    frontend.  Non-matching events are simply skipped.
                                 45                 :                :  *
                                 46                 :                :  * 4. The NOTIFY statement (routine Async_Notify) stores the notification in
                                 47                 :                :  *    a backend-local list which will not be processed until transaction end.
                                 48                 :                :  *
                                 49                 :                :  *    Duplicate notifications from the same transaction are sent out as one
                                 50                 :                :  *    notification only. This is done to save work when for example a trigger
                                 51                 :                :  *    on a 2 million row table fires a notification for each row that has been
                                 52                 :                :  *    changed. If the application needs to receive every single notification
                                 53                 :                :  *    that has been sent, it can easily add some unique string into the extra
                                 54                 :                :  *    payload parameter.
                                 55                 :                :  *
                                 56                 :                :  *    When the transaction is ready to commit, PreCommit_Notify() adds the
                                 57                 :                :  *    pending notifications to the head of the queue. The head pointer of the
                                 58                 :                :  *    queue always points to the next free position and a position is just a
                                 59                 :                :  *    page number and the offset in that page. This is done before marking the
                                 60                 :                :  *    transaction as committed in clog. If we run into problems writing the
                                 61                 :                :  *    notifications, we can still call elog(ERROR, ...) and the transaction
                                 62                 :                :  *    will roll back safely.
                                 63                 :                :  *
                                 64                 :                :  *    Once we have put all of the notifications into the queue, we return to
                                 65                 :                :  *    CommitTransaction() which will then do the actual transaction commit.
                                 66                 :                :  *
                                 67                 :                :  *    After commit we are called another time (AtCommit_Notify()). Here we
                                 68                 :                :  *    make any required updates to the effective listen state (see below).
                                 69                 :                :  *    Then we signal any backends that may be interested in our messages
                                 70                 :                :  *    (including our own backend, if listening).  This is done by
                                 71                 :                :  *    SignalBackends(), which sends a PROCSIG_NOTIFY_INTERRUPT signal to
                                 72                 :                :  *    each relevant backend, as described below.
                                 73                 :                :  *
                                 74                 :                :  *    Finally, after we are out of the transaction altogether and about to go
                                 75                 :                :  *    idle, we scan the queue for messages that need to be sent to our
                                 76                 :                :  *    frontend (which might be notifies from other backends, or self-notifies
                                 77                 :                :  *    from our own).  This step is not part of the CommitTransaction sequence
                                 78                 :                :  *    for two important reasons.  First, we could get errors while sending
                                 79                 :                :  *    data to our frontend, and it's really bad for errors to happen in
                                 80                 :                :  *    post-commit cleanup.  Second, in cases where a procedure issues commits
                                 81                 :                :  *    within a single frontend command, we don't want to send notifies to our
                                 82                 :                :  *    frontend until the command is done; but notifies to other backends
                                 83                 :                :  *    should go out immediately after each commit.
                                 84                 :                :  *
                                 85                 :                :  * 5. Upon receipt of a PROCSIG_NOTIFY_INTERRUPT signal, the signal handler
                                 86                 :                :  *    sets the process's latch, which triggers the event to be processed
                                 87                 :                :  *    immediately if this backend is idle (i.e., it is waiting for a frontend
                                 88                 :                :  *    command and is not within a transaction block. C.f.
                                 89                 :                :  *    ProcessClientReadInterrupt()).  Otherwise the handler may only set a
                                 90                 :                :  *    flag, which will cause the processing to occur just before we next go
                                 91                 :                :  *    idle.
                                 92                 :                :  *
                                 93                 :                :  *    Inbound-notify processing consists of reading all of the notifications
                                 94                 :                :  *    that have arrived since scanning last time. We read every notification
                                 95                 :                :  *    until we reach either a notification from an uncommitted transaction or
                                 96                 :                :  *    the head pointer's position.
                                 97                 :                :  *
                                 98                 :                :  * 6. To limit disk space consumption, the tail pointer needs to be advanced
                                 99                 :                :  *    so that old pages can be truncated. This is relatively expensive
                                100                 :                :  *    (notably, it requires an exclusive lock), so we don't want to do it
                                101                 :                :  *    often. We make sending backends do this work if they advanced the queue
                                102                 :                :  *    head into a new page, but only once every QUEUE_CLEANUP_DELAY pages.
                                103                 :                :  *
                                104                 :                :  * 7. So far we have not discussed how backends change their listening state,
                                105                 :                :  *    nor how notification senders know which backends to awaken.  To handle
                                106                 :                :  *    the latter, we maintain a global channel table (implemented as a dynamic
                                107                 :                :  *    shared hash table, or dshash) that maps channel names to the set of
                                108                 :                :  *    backends listening on each channel.  This table is created lazily on the
                                109                 :                :  *    first LISTEN command and grows dynamically as needed.  There is also a
                                110                 :                :  *    local channel table (a plain dynahash table) in each listening backend,
                                111                 :                :  *    tracking which channels that backend is listening to.  The local table
                                112                 :                :  *    serves to reduce the number of accesses needed to the shared table.
                                113                 :                :  *
                                114                 :                :  *    If the current transaction has executed any LISTEN/UNLISTEN actions,
                                115                 :                :  *    PreCommit_Notify() prepares to commit those.  For LISTEN, it
                                116                 :                :  *    pre-allocates entries in both the per-backend localChannelTable and the
                                117                 :                :  *    shared globalChannelTable (with listening=false so that these entries
                                118                 :                :  *    are no-ops for the moment).  It also records the final per-channel
                                119                 :                :  *    intent in pendingListenActions, so post-commit/abort processing can
                                120                 :                :  *    apply that in a single step.  Since all these allocations happen before
                                121                 :                :  *    committing to clog, we can safely abort the transaction on failure.
                                122                 :                :  *
                                123                 :                :  *    After commit, AtCommit_Notify() runs through pendingListenActions and
                                124                 :                :  *    updates the backend's per-channel listening flags to activate or
                                125                 :                :  *    deactivate listening.  This happens before sending signals.
                                126                 :                :  *
                                127                 :                :  *    SignalBackends() consults the shared global channel table to identify
                                128                 :                :  *    listeners for the channels that the current transaction sent
                                129                 :                :  *    notification(s) to.  Each selected backend is marked as having a wakeup
                                130                 :                :  *    pending to avoid duplicate signals, and a PROCSIG_NOTIFY_INTERRUPT
                                131                 :                :  *    signal is sent to it.
                                132                 :                :  *
                                133                 :                :  * 8. While writing notifications, PreCommit_Notify() records the queue head
                                134                 :                :  *    position both before and after the write.  Because all writers serialize
                                135                 :                :  *    on a cluster-wide heavyweight lock, no other backend can insert entries
                                136                 :                :  *    between these two points.  SignalBackends() uses this fact to directly
                                137                 :                :  *    advance the queue pointer for any backend that is still positioned at
                                138                 :                :  *    the old head, or within the range written, but is not interested in any
                                139                 :                :  *    of our notifications.  This avoids unnecessary wakeups for idle
                                140                 :                :  *    listeners that have nothing to read.  Backends that are not interested
                                141                 :                :  *    in our notifications, but cannot be directly advanced, are signaled only
                                142                 :                :  *    if they are far behind the current queue head; that is to ensure that
                                143                 :                :  *    we can advance the queue tail without undue delay.
                                144                 :                :  *
                                145                 :                :  * An application that listens on the same channel it notifies will get
                                146                 :                :  * NOTIFY messages for its own NOTIFYs.  These can be ignored, if not useful,
                                147                 :                :  * by comparing be_pid in the NOTIFY message to the application's own backend's
                                148                 :                :  * PID.  (As of FE/BE protocol 2.0, the backend's PID is provided to the
                                149                 :                :  * frontend during startup.)  The above design guarantees that notifies from
                                150                 :                :  * other backends will never be missed by ignoring self-notifies.
                                151                 :                :  *
                                152                 :                :  * The amount of shared memory used for notify management (notify_buffers)
                                153                 :                :  * can be varied without affecting anything but performance.  The maximum
                                154                 :                :  * amount of notification data that can be queued at one time is determined
                                155                 :                :  * by the max_notify_queue_pages GUC.
                                156                 :                :  *-------------------------------------------------------------------------
                                157                 :                :  */
                                158                 :                : 
                                159                 :                : #include "postgres.h"
                                160                 :                : 
                                161                 :                : #include <limits.h>
                                162                 :                : #include <unistd.h>
                                163                 :                : #include <signal.h>
                                164                 :                : 
                                165                 :                : #include "access/parallel.h"
                                166                 :                : #include "access/slru.h"
                                167                 :                : #include "access/transam.h"
                                168                 :                : #include "access/xact.h"
                                169                 :                : #include "catalog/pg_database.h"
                                170                 :                : #include "commands/async.h"
                                171                 :                : #include "common/hashfn.h"
                                172                 :                : #include "funcapi.h"
                                173                 :                : #include "lib/dshash.h"
                                174                 :                : #include "libpq/libpq.h"
                                175                 :                : #include "libpq/pqformat.h"
                                176                 :                : #include "miscadmin.h"
                                177                 :                : #include "storage/dsm_registry.h"
                                178                 :                : #include "storage/ipc.h"
                                179                 :                : #include "storage/latch.h"
                                180                 :                : #include "storage/lmgr.h"
                                181                 :                : #include "storage/procsignal.h"
                                182                 :                : #include "tcop/tcopprot.h"
                                183                 :                : #include "utils/builtins.h"
                                184                 :                : #include "utils/dsa.h"
                                185                 :                : #include "utils/guc_hooks.h"
                                186                 :                : #include "utils/memutils.h"
                                187                 :                : #include "utils/ps_status.h"
                                188                 :                : #include "utils/snapmgr.h"
                                189                 :                : #include "utils/timestamp.h"
                                190                 :                : 
                                191                 :                : 
                                192                 :                : /*
                                193                 :                :  * Maximum size of a NOTIFY payload, including terminating NULL.  This
                                194                 :                :  * must be kept small enough so that a notification message fits on one
                                195                 :                :  * SLRU page.  The magic fudge factor here is noncritical as long as it's
                                196                 :                :  * more than AsyncQueueEntryEmptySize --- we make it significantly bigger
                                197                 :                :  * than that, so changes in that data structure won't affect user-visible
                                198                 :                :  * restrictions.
                                199                 :                :  */
                                200                 :                : #define NOTIFY_PAYLOAD_MAX_LENGTH   (BLCKSZ - NAMEDATALEN - 128)
                                201                 :                : 
                                202                 :                : /*
                                203                 :                :  * Struct representing an entry in the global notify queue
                                204                 :                :  *
                                205                 :                :  * This struct declaration has the maximal length, but in a real queue entry
                                206                 :                :  * the data area is only big enough for the actual channel and payload strings
                                207                 :                :  * (each null-terminated).  AsyncQueueEntryEmptySize is the minimum possible
                                208                 :                :  * entry size, if both channel and payload strings are empty (but note it
                                209                 :                :  * doesn't include alignment padding).
                                210                 :                :  *
                                211                 :                :  * The "length" field should always be rounded up to the next QUEUEALIGN
                                212                 :                :  * multiple so that all fields are properly aligned.
                                213                 :                :  */
                                214                 :                : typedef struct AsyncQueueEntry
                                215                 :                : {
                                216                 :                :     int         length;         /* total allocated length of entry */
                                217                 :                :     Oid         dboid;          /* sender's database OID */
                                218                 :                :     TransactionId xid;          /* sender's XID */
                                219                 :                :     int32       srcPid;         /* sender's PID */
                                220                 :                :     char        data[NAMEDATALEN + NOTIFY_PAYLOAD_MAX_LENGTH];
                                221                 :                : } AsyncQueueEntry;
                                222                 :                : 
                                223                 :                : /* Currently, no field of AsyncQueueEntry requires more than int alignment */
                                224                 :                : #define QUEUEALIGN(len)     INTALIGN(len)
                                225                 :                : 
                                226                 :                : #define AsyncQueueEntryEmptySize    (offsetof(AsyncQueueEntry, data) + 2)
                                227                 :                : 
                                228                 :                : /*
                                229                 :                :  * Struct describing a queue position, and assorted macros for working with it
                                230                 :                :  */
                                231                 :                : typedef struct QueuePosition
                                232                 :                : {
                                233                 :                :     int64       page;           /* SLRU page number */
                                234                 :                :     int         offset;         /* byte offset within page */
                                235                 :                : } QueuePosition;
                                236                 :                : 
                                237                 :                : #define QUEUE_POS_PAGE(x)       ((x).page)
                                238                 :                : #define QUEUE_POS_OFFSET(x)     ((x).offset)
                                239                 :                : 
                                240                 :                : #define SET_QUEUE_POS(x,y,z) \
                                241                 :                :     do { \
                                242                 :                :         (x).page = (y); \
                                243                 :                :         (x).offset = (z); \
                                244                 :                :     } while (0)
                                245                 :                : 
                                246                 :                : #define QUEUE_POS_EQUAL(x,y) \
                                247                 :                :     ((x).page == (y).page && (x).offset == (y).offset)
                                248                 :                : 
                                249                 :                : #define QUEUE_POS_IS_ZERO(x) \
                                250                 :                :     ((x).page == 0 && (x).offset == 0)
                                251                 :                : 
                                252                 :                : /* choose logically smaller QueuePosition */
                                253                 :                : #define QUEUE_POS_MIN(x,y) \
                                254                 :                :     (asyncQueuePagePrecedes((x).page, (y).page) ? (x) : \
                                255                 :                :      (x).page != (y).page ? (y) : \
                                256                 :                :      (x).offset < (y).offset ? (x) : (y))
                                257                 :                : 
                                258                 :                : /* choose logically larger QueuePosition */
                                259                 :                : #define QUEUE_POS_MAX(x,y) \
                                260                 :                :     (asyncQueuePagePrecedes((x).page, (y).page) ? (y) : \
                                261                 :                :      (x).page != (y).page ? (x) : \
                                262                 :                :      (x).offset > (y).offset ? (x) : (y))
                                263                 :                : 
                                264                 :                : /* returns true if x comes before y in queue order */
                                265                 :                : #define QUEUE_POS_PRECEDES(x,y) \
                                266                 :                :     (asyncQueuePagePrecedes((x).page, (y).page) || \
                                267                 :                :      ((x).page == (y).page && (x).offset < (y).offset))
                                268                 :                : 
                                269                 :                : /*
                                270                 :                :  * Parameter determining how often we try to advance the tail pointer:
                                271                 :                :  * we do that after every QUEUE_CLEANUP_DELAY pages of NOTIFY data.  This is
                                272                 :                :  * also the distance by which a backend that's not interested in our
                                273                 :                :  * notifications needs to be behind before we'll decide we need to wake it
                                274                 :                :  * up so it can advance its pointer.
                                275                 :                :  *
                                276                 :                :  * Resist the temptation to make this really large.  While that would save
                                277                 :                :  * work in some places, it would add cost in others.  In particular, this
                                278                 :                :  * should likely be less than notify_buffers, to ensure that backends
                                279                 :                :  * catch up before the pages they'll need to read fall out of SLRU cache.
                                280                 :                :  */
                                281                 :                : #define QUEUE_CLEANUP_DELAY 4
                                282                 :                : 
                                283                 :                : /*
                                284                 :                :  * Struct describing a listening backend's status
                                285                 :                :  */
                                286                 :                : typedef struct QueueBackendStatus
                                287                 :                : {
                                288                 :                :     int32       pid;            /* either a PID or InvalidPid */
                                289                 :                :     Oid         dboid;          /* backend's database OID, or InvalidOid */
                                290                 :                :     ProcNumber  nextListener;   /* id of next listener, or INVALID_PROC_NUMBER */
                                291                 :                :     QueuePosition pos;          /* backend has read queue up to here */
                                292                 :                :     bool        wakeupPending;  /* signal sent to backend, not yet processed */
                                293                 :                :     bool        isAdvancing;    /* backend is advancing its position */
                                294                 :                : } QueueBackendStatus;
                                295                 :                : 
                                296                 :                : /*
                                297                 :                :  * Shared memory state for LISTEN/NOTIFY (excluding its SLRU stuff)
                                298                 :                :  *
                                299                 :                :  * The AsyncQueueControl structure is protected by the NotifyQueueLock and
                                300                 :                :  * NotifyQueueTailLock.
                                301                 :                :  *
                                302                 :                :  * When holding NotifyQueueLock in SHARED mode, backends may only inspect
                                303                 :                :  * their own entries as well as the head and tail pointers. Consequently we
                                304                 :                :  * can allow a backend to update its own record while holding only SHARED lock
                                305                 :                :  * (since no other backend will inspect it).
                                306                 :                :  *
                                307                 :                :  * When holding NotifyQueueLock in EXCLUSIVE mode, backends can inspect the
                                308                 :                :  * entries of other backends and also change the head pointer. They can
                                309                 :                :  * also advance other backends' queue positions, unless the other backend
                                310                 :                :  * has isAdvancing set (i.e., is in process of doing that itself).
                                311                 :                :  *
                                312                 :                :  * When holding both NotifyQueueLock and NotifyQueueTailLock in EXCLUSIVE
                                313                 :                :  * mode, backends can change the tail pointers.
                                314                 :                :  *
                                315                 :                :  * SLRU buffer pool is divided in banks and bank wise SLRU lock is used as
                                316                 :                :  * the control lock for the pg_notify SLRU buffers.
                                317                 :                :  * In order to avoid deadlocks, whenever we need multiple locks, we first get
                                318                 :                :  * NotifyQueueTailLock, then NotifyQueueLock, then SLRU bank lock, and lastly
                                319                 :                :  * globalChannelTable partition locks.
                                320                 :                :  *
                                321                 :                :  * Each backend uses the backend[] array entry with index equal to its
                                322                 :                :  * ProcNumber.  We rely on this to make SendProcSignal fast.
                                323                 :                :  *
                                324                 :                :  * The backend[] array entries for actively-listening backends are threaded
                                325                 :                :  * together using firstListener and the nextListener links, so that we can
                                326                 :                :  * scan them without having to iterate over inactive entries.  We keep this
                                327                 :                :  * list in order by ProcNumber so that the scan is cache-friendly when there
                                328                 :                :  * are many active entries.
                                329                 :                :  */
                                330                 :                : typedef struct AsyncQueueControl
                                331                 :                : {
                                332                 :                :     QueuePosition head;         /* head points to the next free location */
                                333                 :                :     QueuePosition tail;         /* tail must be <= the queue position of every
                                334                 :                :                                  * listening backend */
                                335                 :                :     int64       stopPage;       /* oldest unrecycled page; must be <=
                                336                 :                :                                  * tail.page */
                                337                 :                :     ProcNumber  firstListener;  /* id of first listener, or
                                338                 :                :                                  * INVALID_PROC_NUMBER */
                                339                 :                :     TimestampTz lastQueueFillWarn;  /* time of last queue-full msg */
                                340                 :                :     dsa_handle  globalChannelTableDSA;  /* global channel table's DSA handle */
                                341                 :                :     dshash_table_handle globalChannelTableDSH;  /* and its dshash handle */
                                342                 :                :     /* Array with room for MaxBackends entries: */
                                343                 :                :     QueueBackendStatus backend[FLEXIBLE_ARRAY_MEMBER];
                                344                 :                : } AsyncQueueControl;
                                345                 :                : 
                                346                 :                : static AsyncQueueControl *asyncQueueControl;
                                347                 :                : 
                                348                 :                : #define QUEUE_HEAD                  (asyncQueueControl->head)
                                349                 :                : #define QUEUE_TAIL                  (asyncQueueControl->tail)
                                350                 :                : #define QUEUE_STOP_PAGE             (asyncQueueControl->stopPage)
                                351                 :                : #define QUEUE_FIRST_LISTENER        (asyncQueueControl->firstListener)
                                352                 :                : #define QUEUE_BACKEND_PID(i)        (asyncQueueControl->backend[i].pid)
                                353                 :                : #define QUEUE_BACKEND_DBOID(i)      (asyncQueueControl->backend[i].dboid)
                                354                 :                : #define QUEUE_NEXT_LISTENER(i)      (asyncQueueControl->backend[i].nextListener)
                                355                 :                : #define QUEUE_BACKEND_POS(i)        (asyncQueueControl->backend[i].pos)
                                356                 :                : #define QUEUE_BACKEND_WAKEUP_PENDING(i) (asyncQueueControl->backend[i].wakeupPending)
                                357                 :                : #define QUEUE_BACKEND_IS_ADVANCING(i)   (asyncQueueControl->backend[i].isAdvancing)
                                358                 :                : 
                                359                 :                : /*
                                360                 :                :  * The SLRU buffer area through which we access the notification queue
                                361                 :                :  */
                                362                 :                : static SlruCtlData NotifyCtlData;
                                363                 :                : 
                                364                 :                : #define NotifyCtl                   (&NotifyCtlData)
                                365                 :                : #define QUEUE_PAGESIZE              BLCKSZ
                                366                 :                : 
                                367                 :                : #define QUEUE_FULL_WARN_INTERVAL    5000    /* warn at most once every 5s */
                                368                 :                : 
                                369                 :                : /*
                                370                 :                :  * Global channel table definitions
                                371                 :                :  *
                                372                 :                :  * This hash table maps (database OID, channel name) keys to arrays of
                                373                 :                :  * ProcNumbers representing the backends listening or about to listen
                                374                 :                :  * on each channel.  The "listening" flags allow us to create hash table
                                375                 :                :  * entries pre-commit and not have to assume that creating them post-commit
                                376                 :                :  * will succeed.
                                377                 :                :  */
                                378                 :                : #define INITIAL_LISTENERS_ARRAY_SIZE 4
                                379                 :                : 
                                380                 :                : typedef struct GlobalChannelKey
                                381                 :                : {
                                382                 :                :     Oid         dboid;
                                383                 :                :     char        channel[NAMEDATALEN];
                                384                 :                : } GlobalChannelKey;
                                385                 :                : 
                                386                 :                : typedef struct ListenerEntry
                                387                 :                : {
                                388                 :                :     ProcNumber  procNo;         /* listener's ProcNumber */
                                389                 :                :     bool        listening;      /* true if committed listener */
                                390                 :                : } ListenerEntry;
                                391                 :                : 
                                392                 :                : typedef struct GlobalChannelEntry
                                393                 :                : {
                                394                 :                :     GlobalChannelKey key;       /* hash key */
                                395                 :                :     dsa_pointer listenersArray; /* DSA pointer to ListenerEntry array */
                                396                 :                :     int         numListeners;   /* Number of listeners currently stored */
                                397                 :                :     int         allocatedListeners; /* Allocated size of array */
                                398                 :                : } GlobalChannelEntry;
                                399                 :                : 
                                400                 :                : static dshash_table *globalChannelTable = NULL;
                                401                 :                : static dsa_area *globalChannelDSA = NULL;
                                402                 :                : 
                                403                 :                : /*
                                404                 :                :  * localChannelTable caches the channel names this backend is listening on
                                405                 :                :  * (including those we have staged to be listened on, but not yet committed).
                                406                 :                :  * Used by IsListeningOn() for fast lookups when reading notifications.
                                407                 :                :  */
                                408                 :                : static HTAB *localChannelTable = NULL;
                                409                 :                : 
                                410                 :                : /* We test this condition to detect that we're not listening at all */
                                411                 :                : #define LocalChannelTableIsEmpty() \
                                412                 :                :     (localChannelTable == NULL || hash_get_num_entries(localChannelTable) == 0)
                                413                 :                : 
                                414                 :                : /*
                                415                 :                :  * State for pending LISTEN/UNLISTEN actions consists of an ordered list of
                                416                 :                :  * all actions requested in the current transaction.  As explained above,
                                417                 :                :  * we don't actually change listen state until we reach transaction commit.
                                418                 :                :  *
                                419                 :                :  * The list is kept in CurTransactionContext.  In subtransactions, each
                                420                 :                :  * subtransaction has its own list in its own CurTransactionContext, but
                                421                 :                :  * successful subtransactions attach their lists to their parent's list.
                                422                 :                :  * Failed subtransactions simply discard their lists.
                                423                 :                :  */
                                424                 :                : typedef enum
                                425                 :                : {
                                426                 :                :     LISTEN_LISTEN,
                                427                 :                :     LISTEN_UNLISTEN,
                                428                 :                :     LISTEN_UNLISTEN_ALL,
                                429                 :                : } ListenActionKind;
                                430                 :                : 
                                431                 :                : typedef struct
                                432                 :                : {
                                433                 :                :     ListenActionKind action;
                                434                 :                :     char        channel[FLEXIBLE_ARRAY_MEMBER]; /* nul-terminated string */
                                435                 :                : } ListenAction;
                                436                 :                : 
                                437                 :                : typedef struct ActionList
                                438                 :                : {
                                439                 :                :     int         nestingLevel;   /* current transaction nesting depth */
                                440                 :                :     List       *actions;        /* list of ListenAction structs */
                                441                 :                :     struct ActionList *upper;   /* details for upper transaction levels */
                                442                 :                : } ActionList;
                                443                 :                : 
                                444                 :                : static ActionList *pendingActions = NULL;
                                445                 :                : 
                                446                 :                : /*
                                447                 :                :  * Hash table recording the final listen/unlisten intent per channel for
                                448                 :                :  * the current transaction.  Key is channel name, value is PENDING_LISTEN or
                                449                 :                :  * PENDING_UNLISTEN.  This keeps critical commit/abort processing to one step
                                450                 :                :  * per channel instead of replaying every action.  This is built from the
                                451                 :                :  * pendingActions list by PreCommit_Notify, then used by AtCommit_Notify or
                                452                 :                :  * AtAbort_Notify.
                                453                 :                :  */
                                454                 :                : typedef enum
                                455                 :                : {
                                456                 :                :     PENDING_LISTEN,
                                457                 :                :     PENDING_UNLISTEN,
                                458                 :                : } PendingListenAction;
                                459                 :                : 
                                460                 :                : typedef struct PendingListenEntry
                                461                 :                : {
                                462                 :                :     char        channel[NAMEDATALEN];   /* hash key */
                                463                 :                :     PendingListenAction action; /* which action should we perform? */
                                464                 :                : } PendingListenEntry;
                                465                 :                : 
                                466                 :                : static HTAB *pendingListenActions = NULL;
                                467                 :                : 
                                468                 :                : /*
                                469                 :                :  * State for outbound notifies consists of a list of all channels+payloads
                                470                 :                :  * NOTIFYed in the current transaction.  We do not actually perform a NOTIFY
                                471                 :                :  * until and unless the transaction commits.  pendingNotifies is NULL if no
                                472                 :                :  * NOTIFYs have been done in the current (sub) transaction.
                                473                 :                :  *
                                474                 :                :  * We discard duplicate notify events issued in the same transaction.
                                475                 :                :  * Hence, in addition to the list proper (which we need to track the order
                                476                 :                :  * of the events, since we guarantee to deliver them in order), we build a
                                477                 :                :  * hash table which we can probe to detect duplicates.  Since building the
                                478                 :                :  * hash table is somewhat expensive, we do so only once we have at least
                                479                 :                :  * MIN_HASHABLE_NOTIFIES events queued in the current (sub) transaction;
                                480                 :                :  * before that we just scan the events linearly.
                                481                 :                :  *
                                482                 :                :  * The list is kept in CurTransactionContext.  In subtransactions, each
                                483                 :                :  * subtransaction has its own list in its own CurTransactionContext, but
                                484                 :                :  * successful subtransactions add their entries to their parent's list.
                                485                 :                :  * Failed subtransactions simply discard their lists.  Since these lists
                                486                 :                :  * are independent, there may be notify events in a subtransaction's list
                                487                 :                :  * that duplicate events in some ancestor (sub) transaction; we get rid of
                                488                 :                :  * the dups when merging the subtransaction's list into its parent's.
                                489                 :                :  *
                                490                 :                :  * Note: the action and notify lists do not interact within a transaction.
                                491                 :                :  * In particular, if a transaction does NOTIFY and then LISTEN on the same
                                492                 :                :  * condition name, it will get a self-notify at commit.  This is a bit odd
                                493                 :                :  * but is consistent with our historical behavior.
                                494                 :                :  */
                                495                 :                : typedef struct Notification
                                496                 :                : {
                                497                 :                :     uint16      channel_len;    /* length of channel-name string */
                                498                 :                :     uint16      payload_len;    /* length of payload string */
                                499                 :                :     /* null-terminated channel name, then null-terminated payload follow */
                                500                 :                :     char        data[FLEXIBLE_ARRAY_MEMBER];
                                501                 :                : } Notification;
                                502                 :                : 
                                503                 :                : typedef struct NotificationList
                                504                 :                : {
                                505                 :                :     int         nestingLevel;   /* current transaction nesting depth */
                                506                 :                :     List       *events;         /* list of Notification structs */
                                507                 :                :     HTAB       *hashtab;        /* hash of NotificationHash structs, or NULL */
                                508                 :                :     List       *uniqueChannelNames; /* unique channel names being notified */
                                509                 :                :     HTAB       *uniqueChannelHash;  /* hash of unique channel names, or NULL */
                                510                 :                :     struct NotificationList *upper; /* details for upper transaction levels */
                                511                 :                : } NotificationList;
                                512                 :                : 
                                513                 :                : #define MIN_HASHABLE_NOTIFIES 16    /* threshold to build hashtab */
                                514                 :                : 
                                515                 :                : struct NotificationHash
                                516                 :                : {
                                517                 :                :     Notification *event;        /* => the actual Notification struct */
                                518                 :                : };
                                519                 :                : 
                                520                 :                : static NotificationList *pendingNotifies = NULL;
                                521                 :                : 
                                522                 :                : /*
                                523                 :                :  * Hash entry in NotificationList.uniqueChannelHash or localChannelTable
                                524                 :                :  * (both just carry the channel name, with no payload).
                                525                 :                :  */
                                526                 :                : typedef struct ChannelName
                                527                 :                : {
                                528                 :                :     char        channel[NAMEDATALEN];   /* hash key */
                                529                 :                : } ChannelName;
                                530                 :                : 
                                531                 :                : /*
                                532                 :                :  * Inbound notifications are initially processed by HandleNotifyInterrupt(),
                                533                 :                :  * called from inside a signal handler. That just sets the
                                534                 :                :  * notifyInterruptPending flag and sets the process
                                535                 :                :  * latch. ProcessNotifyInterrupt() will then be called whenever it's safe to
                                536                 :                :  * actually deal with the interrupt.
                                537                 :                :  */
                                538                 :                : volatile sig_atomic_t notifyInterruptPending = false;
                                539                 :                : 
                                540                 :                : /* True if we've registered an on_shmem_exit cleanup */
                                541                 :                : static bool unlistenExitRegistered = false;
                                542                 :                : 
                                543                 :                : /* True if we're currently registered as a listener in asyncQueueControl */
                                544                 :                : static bool amRegisteredListener = false;
                                545                 :                : 
                                546                 :                : /*
                                547                 :                :  * Queue head positions for direct advancement.
                                548                 :                :  * These are captured during PreCommit_Notify while holding the heavyweight
                                549                 :                :  * lock on database 0, ensuring no other backend can insert notifications
                                550                 :                :  * between them.  SignalBackends uses these to advance idle backends.
                                551                 :                :  */
                                552                 :                : static QueuePosition queueHeadBeforeWrite;
                                553                 :                : static QueuePosition queueHeadAfterWrite;
                                554                 :                : 
                                555                 :                : /*
                                556                 :                :  * Workspace arrays for SignalBackends.  These are preallocated in
                                557                 :                :  * PreCommit_Notify to avoid needing memory allocation after committing to
                                558                 :                :  * clog.
                                559                 :                :  */
                                560                 :                : static int32 *signalPids = NULL;
                                561                 :                : static ProcNumber *signalProcnos = NULL;
                                562                 :                : 
                                563                 :                : /* have we advanced to a page that's a multiple of QUEUE_CLEANUP_DELAY? */
                                564                 :                : static bool tryAdvanceTail = false;
                                565                 :                : 
                                566                 :                : /* GUC parameters */
                                567                 :                : bool        Trace_notify = false;
                                568                 :                : 
                                569                 :                : /* For 8 KB pages this gives 8 GB of disk space */
                                570                 :                : int         max_notify_queue_pages = 1048576;
                                571                 :                : 
                                572                 :                : /* local function prototypes */
                                573                 :                : static int  asyncQueueErrdetailForIoError(const void *opaque_data);
                                574                 :                : static inline int64 asyncQueuePageDiff(int64 p, int64 q);
                                575                 :                : static inline bool asyncQueuePagePrecedes(int64 p, int64 q);
                                576                 :                : static inline void GlobalChannelKeyInit(GlobalChannelKey *key, Oid dboid,
                                577                 :                :                                         const char *channel);
                                578                 :                : static dshash_hash globalChannelTableHash(const void *key, size_t size,
                                579                 :                :                                           void *arg);
                                580                 :                : static void initGlobalChannelTable(void);
                                581                 :                : static void initLocalChannelTable(void);
                                582                 :                : static void queue_listen(ListenActionKind action, const char *channel);
                                583                 :                : static void Async_UnlistenOnExit(int code, Datum arg);
                                584                 :                : static void BecomeRegisteredListener(void);
                                585                 :                : static void PrepareTableEntriesForListen(const char *channel);
                                586                 :                : static void PrepareTableEntriesForUnlisten(const char *channel);
                                587                 :                : static void PrepareTableEntriesForUnlistenAll(void);
                                588                 :                : static void RemoveListenerFromChannel(GlobalChannelEntry **entry_ptr,
                                589                 :                :                                       ListenerEntry *listeners,
                                590                 :                :                                       int idx);
                                591                 :                : static void ApplyPendingListenActions(bool isCommit);
                                592                 :                : static void CleanupListenersOnExit(void);
                                593                 :                : static bool IsListeningOn(const char *channel);
                                594                 :                : static void asyncQueueUnregister(void);
                                595                 :                : static bool asyncQueueIsFull(void);
                                596                 :                : static bool asyncQueueAdvance(volatile QueuePosition *position, int entryLength);
                                597                 :                : static void asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe);
                                598                 :                : static ListCell *asyncQueueAddEntries(ListCell *nextNotify);
                                599                 :                : static double asyncQueueUsage(void);
                                600                 :                : static void asyncQueueFillWarning(void);
                                601                 :                : static void SignalBackends(void);
                                602                 :                : static void asyncQueueReadAllNotifications(void);
                                603                 :                : static bool asyncQueueProcessPageEntries(QueuePosition *current,
                                604                 :                :                                          QueuePosition stop,
                                605                 :                :                                          Snapshot snapshot);
                                606                 :                : static void asyncQueueAdvanceTail(void);
                                607                 :                : static void ProcessIncomingNotify(bool flush);
                                608                 :                : static bool AsyncExistsPendingNotify(Notification *n);
                                609                 :                : static void AddEventToPendingNotifies(Notification *n);
                                610                 :                : static uint32 notification_hash(const void *key, Size keysize);
                                611                 :                : static int  notification_match(const void *key1, const void *key2, Size keysize);
                                612                 :                : static void ClearPendingActionsAndNotifies(void);
                                613                 :                : 
                                614                 :                : static int
    2 heikki.linnakangas@i      615                 :UNC           0 : asyncQueueErrdetailForIoError(const void *opaque_data)
                                616                 :                : {
                                617                 :              0 :     const QueuePosition *position = opaque_data;
                                618                 :                : 
                                619                 :              0 :     return errdetail("Could not access async queue at page %" PRId64 ", offset %d.",
                                620                 :              0 :                      position->page, position->offset);
                                621                 :                : }
                                622                 :                : 
                                623                 :                : /*
                                624                 :                :  * Compute the difference between two queue page numbers.
                                625                 :                :  * Previously this function accounted for a wraparound.
                                626                 :                :  */
                                627                 :                : static inline int64
  837 akorotkov@postgresql      628                 :UBC           0 : asyncQueuePageDiff(int64 p, int64 q)
                                629                 :                : {
                                630                 :              0 :     return p - q;
                                631                 :                : }
                                632                 :                : 
                                633                 :                : /*
                                634                 :                :  * Determines whether p precedes q.
                                635                 :                :  * Previously this function accounted for a wraparound.
                                636                 :                :  */
                                637                 :                : static inline bool
  837 akorotkov@postgresql      638                 :CBC         144 : asyncQueuePagePrecedes(int64 p, int64 q)
                                639                 :                : {
                                640                 :            144 :     return p < q;
                                641                 :                : }
                                642                 :                : 
                                643                 :                : /*
                                644                 :                :  * GlobalChannelKeyInit
                                645                 :                :  *      Prepare a global channel table key for hashing.
                                646                 :                :  */
                                647                 :                : static inline void
   59 tgl@sss.pgh.pa.us         648                 :GNC         206 : GlobalChannelKeyInit(GlobalChannelKey *key, Oid dboid, const char *channel)
                                649                 :                : {
                                650                 :            206 :     memset(key, 0, sizeof(GlobalChannelKey));
                                651                 :            206 :     key->dboid = dboid;
                                652                 :            206 :     strlcpy(key->channel, channel, NAMEDATALEN);
                                653                 :            206 : }
                                654                 :                : 
                                655                 :                : /*
                                656                 :                :  * globalChannelTableHash
                                657                 :                :  *      Hash function for global channel table keys.
                                658                 :                :  */
                                659                 :                : static dshash_hash
                                660                 :            206 : globalChannelTableHash(const void *key, size_t size, void *arg)
                                661                 :                : {
                                662                 :            206 :     const GlobalChannelKey *k = (const GlobalChannelKey *) key;
                                663                 :                :     dshash_hash h;
                                664                 :                : 
                                665                 :            206 :     h = DatumGetUInt32(hash_uint32(k->dboid));
                                666                 :            206 :     h ^= DatumGetUInt32(hash_any((const unsigned char *) k->channel,
                                667                 :            206 :                                  strnlen(k->channel, NAMEDATALEN)));
                                668                 :                : 
                                669                 :            206 :     return h;
                                670                 :                : }
                                671                 :                : 
                                672                 :                : /* parameters for the global channel table */
                                673                 :                : static const dshash_parameters globalChannelTableDSHParams = {
                                674                 :                :     sizeof(GlobalChannelKey),
                                675                 :                :     sizeof(GlobalChannelEntry),
                                676                 :                :     dshash_memcmp,
                                677                 :                :     globalChannelTableHash,
                                678                 :                :     dshash_memcpy,
                                679                 :                :     LWTRANCHE_NOTIFY_CHANNEL_HASH
                                680                 :                : };
                                681                 :                : 
                                682                 :                : /*
                                683                 :                :  * initGlobalChannelTable
                                684                 :                :  *      Lazy initialization of the global channel table.
                                685                 :                :  */
                                686                 :                : static void
                                687                 :            146 : initGlobalChannelTable(void)
                                688                 :                : {
                                689                 :                :     MemoryContext oldcontext;
                                690                 :                : 
                                691                 :                :     /* Quick exit if we already did this */
                                692         [ +  + ]:            146 :     if (asyncQueueControl->globalChannelTableDSH != DSHASH_HANDLE_INVALID &&
                                693         [ +  + ]:            139 :         globalChannelTable != NULL)
                                694                 :            127 :         return;
                                695                 :                : 
                                696                 :                :     /* Otherwise, use a lock to ensure only one process creates the table */
                                697                 :             19 :     LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
                                698                 :                : 
                                699                 :                :     /* Be sure any local memory allocated by DSA routines is persistent */
                                700                 :             19 :     oldcontext = MemoryContextSwitchTo(TopMemoryContext);
                                701                 :                : 
                                702         [ +  + ]:             19 :     if (asyncQueueControl->globalChannelTableDSH == DSHASH_HANDLE_INVALID)
                                703                 :                :     {
                                704                 :                :         /* Initialize dynamic shared hash table for global channels */
                                705                 :              7 :         globalChannelDSA = dsa_create(LWTRANCHE_NOTIFY_CHANNEL_HASH);
                                706                 :              7 :         dsa_pin(globalChannelDSA);
                                707                 :              7 :         dsa_pin_mapping(globalChannelDSA);
                                708                 :              7 :         globalChannelTable = dshash_create(globalChannelDSA,
                                709                 :                :                                            &globalChannelTableDSHParams,
                                710                 :                :                                            NULL);
                                711                 :                : 
                                712                 :                :         /* Store handles in shared memory for other backends to use */
                                713                 :              7 :         asyncQueueControl->globalChannelTableDSA = dsa_get_handle(globalChannelDSA);
                                714                 :              7 :         asyncQueueControl->globalChannelTableDSH =
                                715                 :              7 :             dshash_get_hash_table_handle(globalChannelTable);
                                716                 :                :     }
                                717         [ +  - ]:             12 :     else if (!globalChannelTable)
                                718                 :                :     {
                                719                 :                :         /* Attach to existing dynamic shared hash table */
                                720                 :             12 :         globalChannelDSA = dsa_attach(asyncQueueControl->globalChannelTableDSA);
                                721                 :             12 :         dsa_pin_mapping(globalChannelDSA);
                                722                 :             12 :         globalChannelTable = dshash_attach(globalChannelDSA,
                                723                 :                :                                            &globalChannelTableDSHParams,
                                724                 :             12 :                                            asyncQueueControl->globalChannelTableDSH,
                                725                 :                :                                            NULL);
                                726                 :                :     }
                                727                 :                : 
                                728                 :             19 :     MemoryContextSwitchTo(oldcontext);
                                729                 :             19 :     LWLockRelease(NotifyQueueLock);
                                730                 :                : }
                                731                 :                : 
                                732                 :                : /*
                                733                 :                :  * initLocalChannelTable
                                734                 :                :  *      Lazy initialization of the local channel table.
                                735                 :                :  *      Once created, this table lasts for the life of the session.
                                736                 :                :  */
                                737                 :                : static void
                                738                 :             91 : initLocalChannelTable(void)
                                739                 :                : {
                                740                 :                :     HASHCTL     hash_ctl;
                                741                 :                : 
                                742                 :                :     /* Quick exit if we already did this */
                                743         [ +  + ]:             91 :     if (localChannelTable != NULL)
                                744                 :             74 :         return;
                                745                 :                : 
                                746                 :                :     /* Initialize local hash table for this backend's listened channels */
                                747                 :             17 :     hash_ctl.keysize = NAMEDATALEN;
                                748                 :             17 :     hash_ctl.entrysize = sizeof(ChannelName);
                                749                 :                : 
                                750                 :             17 :     localChannelTable =
                                751                 :             17 :         hash_create("Local Listen Channels",
                                752                 :                :                     64,
                                753                 :                :                     &hash_ctl,
                                754                 :                :                     HASH_ELEM | HASH_STRINGS);
                                755                 :                : }
                                756                 :                : 
                                757                 :                : /*
                                758                 :                :  * initPendingListenActions
                                759                 :                :  *      Lazy initialization of the pending listen actions hash table.
                                760                 :                :  *      This is allocated in CurTransactionContext during PreCommit_Notify,
                                761                 :                :  *      and destroyed at transaction end.
                                762                 :                :  */
                                763                 :                : static void
                                764                 :             91 : initPendingListenActions(void)
                                765                 :                : {
                                766                 :                :     HASHCTL     hash_ctl;
                                767                 :                : 
                                768         [ -  + ]:             91 :     if (pendingListenActions != NULL)
   59 tgl@sss.pgh.pa.us         769                 :UNC           0 :         return;
                                770                 :                : 
   59 tgl@sss.pgh.pa.us         771                 :GNC          91 :     hash_ctl.keysize = NAMEDATALEN;
                                772                 :             91 :     hash_ctl.entrysize = sizeof(PendingListenEntry);
                                773                 :             91 :     hash_ctl.hcxt = CurTransactionContext;
                                774                 :                : 
                                775                 :             91 :     pendingListenActions =
                                776                 :             91 :         hash_create("Pending Listen Actions",
                                777                 :             91 :                     list_length(pendingActions->actions),
                                778                 :                :                     &hash_ctl,
                                779                 :                :                     HASH_ELEM | HASH_STRINGS | HASH_CONTEXT);
                                780                 :                : }
                                781                 :                : 
                                782                 :                : /*
                                783                 :                :  * Report space needed for our shared memory area
                                784                 :                :  */
                                785                 :                : Size
 5871 tgl@sss.pgh.pa.us         786                 :CBC        2147 : AsyncShmemSize(void)
                                787                 :                : {
                                788                 :                :     Size        size;
                                789                 :                : 
                                790                 :                :     /* This had better match AsyncShmemInit */
  742 heikki.linnakangas@i      791                 :           2147 :     size = mul_size(MaxBackends, sizeof(QueueBackendStatus));
 4040 tgl@sss.pgh.pa.us         792                 :           2147 :     size = add_size(size, offsetof(AsyncQueueControl, backend));
                                793                 :                : 
  746 alvherre@alvh.no-ip.      794                 :           2147 :     size = add_size(size, SimpleLruShmemSize(notify_buffers, 0));
                                795                 :                : 
 5871 tgl@sss.pgh.pa.us         796                 :           2147 :     return size;
                                797                 :                : }
                                798                 :                : 
                                799                 :                : /*
                                800                 :                :  * Initialize our shared memory area
                                801                 :                :  */
                                802                 :                : void
                                803                 :           1150 : AsyncShmemInit(void)
                                804                 :                : {
                                805                 :                :     bool        found;
                                806                 :                :     Size        size;
                                807                 :                : 
                                808                 :                :     /*
                                809                 :                :      * Create or attach to the AsyncQueueControl structure.
                                810                 :                :      */
  742 heikki.linnakangas@i      811                 :           1150 :     size = mul_size(MaxBackends, sizeof(QueueBackendStatus));
 4040 tgl@sss.pgh.pa.us         812                 :           1150 :     size = add_size(size, offsetof(AsyncQueueControl, backend));
                                813                 :                : 
 5871                           814                 :           1150 :     asyncQueueControl = (AsyncQueueControl *)
                                815                 :           1150 :         ShmemInitStruct("Async Queue Control", size, &found);
                                816                 :                : 
                                817         [ +  - ]:           1150 :     if (!found)
                                818                 :                :     {
                                819                 :                :         /* First time through, so initialize it */
                                820                 :           1150 :         SET_QUEUE_POS(QUEUE_HEAD, 0, 0);
                                821                 :           1150 :         SET_QUEUE_POS(QUEUE_TAIL, 0, 0);
 1933                           822                 :           1150 :         QUEUE_STOP_PAGE = 0;
  742 heikki.linnakangas@i      823                 :           1150 :         QUEUE_FIRST_LISTENER = INVALID_PROC_NUMBER;
 5871 tgl@sss.pgh.pa.us         824                 :           1150 :         asyncQueueControl->lastQueueFillWarn = 0;
   59 tgl@sss.pgh.pa.us         825                 :GNC        1150 :         asyncQueueControl->globalChannelTableDSA = DSA_HANDLE_INVALID;
                                826                 :           1150 :         asyncQueueControl->globalChannelTableDSH = DSHASH_HANDLE_INVALID;
  742 heikki.linnakangas@i      827         [ +  + ]:CBC      106834 :         for (int i = 0; i < MaxBackends; i++)
                                828                 :                :         {
 5871 tgl@sss.pgh.pa.us         829                 :         105684 :             QUEUE_BACKEND_PID(i) = InvalidPid;
 3819                           830                 :         105684 :             QUEUE_BACKEND_DBOID(i) = InvalidOid;
  742 heikki.linnakangas@i      831                 :         105684 :             QUEUE_NEXT_LISTENER(i) = INVALID_PROC_NUMBER;
 5871 tgl@sss.pgh.pa.us         832                 :         105684 :             SET_QUEUE_POS(QUEUE_BACKEND_POS(i), 0, 0);
   59 tgl@sss.pgh.pa.us         833                 :GNC      105684 :             QUEUE_BACKEND_WAKEUP_PENDING(i) = false;
                                834                 :         105684 :             QUEUE_BACKEND_IS_ADVANCING(i) = false;
                                835                 :                :         }
                                836                 :                :     }
                                837                 :                : 
                                838                 :                :     /*
                                839                 :                :      * Set up SLRU management of the pg_notify data. Note that long segment
                                840                 :                :      * names are used in order to avoid wraparound.
                                841                 :                :      */
 2130 tgl@sss.pgh.pa.us         842                 :CBC        1150 :     NotifyCtl->PagePrecedes = asyncQueuePagePrecedes;
    2 heikki.linnakangas@i      843                 :GNC        1150 :     NotifyCtl->errdetail_for_io_error = asyncQueueErrdetailForIoError;
  746 alvherre@alvh.no-ip.      844                 :CBC        1150 :     SimpleLruInit(NotifyCtl, "notify", notify_buffers, 0,
                                845                 :                :                   "pg_notify", LWTRANCHE_NOTIFY_BUFFER, LWTRANCHE_NOTIFY_SLRU,
                                846                 :                :                   SYNC_HANDLER_NONE, true);
                                847                 :                : 
 5871 tgl@sss.pgh.pa.us         848         [ +  - ]:           1150 :     if (!found)
                                849                 :                :     {
                                850                 :                :         /*
                                851                 :                :          * During start or reboot, clean out the pg_notify directory.
                                852                 :                :          */
 2130                           853                 :           1150 :         (void) SlruScanDirectory(NotifyCtl, SlruScanDirCbDeleteAll, NULL);
                                854                 :                :     }
 5871                           855                 :           1150 : }
                                856                 :                : 
                                857                 :                : 
                                858                 :                : /*
                                859                 :                :  * pg_notify -
                                860                 :                :  *    SQL function to send a notification event
                                861                 :                :  */
                                862                 :                : Datum
                                863                 :           1072 : pg_notify(PG_FUNCTION_ARGS)
                                864                 :                : {
                                865                 :                :     const char *channel;
                                866                 :                :     const char *payload;
                                867                 :                : 
                                868         [ +  + ]:           1072 :     if (PG_ARGISNULL(0))
                                869                 :              3 :         channel = "";
                                870                 :                :     else
                                871                 :           1069 :         channel = text_to_cstring(PG_GETARG_TEXT_PP(0));
                                872                 :                : 
                                873         [ +  + ]:           1072 :     if (PG_ARGISNULL(1))
                                874                 :              6 :         payload = "";
                                875                 :                :     else
                                876                 :           1066 :         payload = text_to_cstring(PG_GETARG_TEXT_PP(1));
                                877                 :                : 
                                878                 :                :     /* For NOTIFY as a statement, this is checked in ProcessUtility */
 5867                           879                 :           1072 :     PreventCommandDuringRecovery("NOTIFY");
                                880                 :                : 
 5871                           881                 :           1072 :     Async_Notify(channel, payload);
                                882                 :                : 
                                883                 :           1063 :     PG_RETURN_VOID();
                                884                 :                : }
                                885                 :                : 
                                886                 :                : 
                                887                 :                : /*
                                888                 :                :  * Async_Notify
                                889                 :                :  *
                                890                 :                :  *      This is executed by the SQL notify command.
                                891                 :                :  *
                                892                 :                :  *      Adds the message to the list of pending notifies.
                                893                 :                :  *      Actual notification happens during transaction commit.
                                894                 :                :  *      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
                                895                 :                :  */
                                896                 :                : void
                                897                 :           1128 : Async_Notify(const char *channel, const char *payload)
                                898                 :                : {
 2354 rhaas@postgresql.org      899                 :           1128 :     int         my_level = GetCurrentTransactionNestLevel();
                                900                 :                :     size_t      channel_len;
                                901                 :                :     size_t      payload_len;
                                902                 :                :     Notification *n;
                                903                 :                :     MemoryContext oldcontext;
                                904                 :                : 
 3797                           905         [ -  + ]:           1128 :     if (IsParallelWorker())
 3797 rhaas@postgresql.org      906         [ #  # ]:UBC           0 :         elog(ERROR, "cannot send notifications from a parallel worker");
                                907                 :                : 
 9419 peter_e@gmx.net           908         [ -  + ]:CBC        1128 :     if (Trace_notify)
 5871 tgl@sss.pgh.pa.us         909         [ #  # ]:UBC           0 :         elog(DEBUG1, "Async_Notify(%s)", channel);
                                910                 :                : 
 2404 tgl@sss.pgh.pa.us         911         [ +  - ]:CBC        1128 :     channel_len = channel ? strlen(channel) : 0;
                                912         [ +  + ]:           1128 :     payload_len = payload ? strlen(payload) : 0;
                                913                 :                : 
                                914                 :                :     /* a channel name must be specified */
                                915         [ +  + ]:           1128 :     if (channel_len == 0)
 5871                           916         [ +  - ]:              6 :         ereport(ERROR,
                                917                 :                :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                                918                 :                :                  errmsg("channel name cannot be empty")));
                                919                 :                : 
                                920                 :                :     /* enforce length limits */
 2404                           921         [ +  + ]:           1122 :     if (channel_len >= NAMEDATALEN)
 5871                           922         [ +  - ]:              3 :         ereport(ERROR,
                                923                 :                :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                                924                 :                :                  errmsg("channel name too long")));
                                925                 :                : 
 2404                           926         [ -  + ]:           1119 :     if (payload_len >= NOTIFY_PAYLOAD_MAX_LENGTH)
 2404 tgl@sss.pgh.pa.us         927         [ #  # ]:UBC           0 :         ereport(ERROR,
                                928                 :                :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                                929                 :                :                  errmsg("payload string too long")));
                                930                 :                : 
                                931                 :                :     /*
                                932                 :                :      * We must construct the Notification entry, even if we end up not using
                                933                 :                :      * it, in order to compare it cheaply to existing list entries.
                                934                 :                :      *
                                935                 :                :      * The notification list needs to live until end of transaction, so store
                                936                 :                :      * it in the transaction context.
                                937                 :                :      */
 5871 tgl@sss.pgh.pa.us         938                 :CBC        1119 :     oldcontext = MemoryContextSwitchTo(CurTransactionContext);
                                939                 :                : 
 2404                           940                 :           1119 :     n = (Notification *) palloc(offsetof(Notification, data) +
                                941                 :           1119 :                                 channel_len + payload_len + 2);
                                942                 :           1119 :     n->channel_len = channel_len;
                                943                 :           1119 :     n->payload_len = payload_len;
                                944                 :           1119 :     strcpy(n->data, channel);
 5871                           945         [ +  + ]:           1119 :     if (payload)
 2404                           946                 :           1105 :         strcpy(n->data + channel_len + 1, payload);
                                947                 :                :     else
                                948                 :             14 :         n->data[channel_len + 1] = '\0';
                                949                 :                : 
 2354 rhaas@postgresql.org      950   [ +  +  +  + ]:           1119 :     if (pendingNotifies == NULL || my_level > pendingNotifies->nestingLevel)
 2404 tgl@sss.pgh.pa.us         951                 :             57 :     {
                                952                 :                :         NotificationList *notifies;
                                953                 :                : 
                                954                 :                :         /*
                                955                 :                :          * First notify event in current (sub)xact. Note that we allocate the
                                956                 :                :          * NotificationList in TopTransactionContext; the nestingLevel might
                                957                 :                :          * get changed later by AtSubCommit_Notify.
                                958                 :                :          */
                                959                 :                :         notifies = (NotificationList *)
 2354 rhaas@postgresql.org      960                 :             57 :             MemoryContextAlloc(TopTransactionContext,
                                961                 :                :                                sizeof(NotificationList));
                                962                 :             57 :         notifies->nestingLevel = my_level;
                                963                 :             57 :         notifies->events = list_make1(n);
                                964                 :                :         /* We certainly don't need a hashtable yet */
                                965                 :             57 :         notifies->hashtab = NULL;
                                966                 :                :         /* We won't build uniqueChannelNames/Hash till later, either */
   59 tgl@sss.pgh.pa.us         967                 :GNC          57 :         notifies->uniqueChannelNames = NIL;
                                968                 :             57 :         notifies->uniqueChannelHash = NULL;
 2354 rhaas@postgresql.org      969                 :CBC          57 :         notifies->upper = pendingNotifies;
                                970                 :             57 :         pendingNotifies = notifies;
                                971                 :                :     }
                                972                 :                :     else
                                973                 :                :     {
                                974                 :                :         /* Now check for duplicates */
                                975         [ +  + ]:           1062 :         if (AsyncExistsPendingNotify(n))
                                976                 :                :         {
                                977                 :                :             /* It's a dup, so forget it */
                                978                 :             13 :             pfree(n);
                                979                 :             13 :             MemoryContextSwitchTo(oldcontext);
                                980                 :             13 :             return;
                                981                 :                :         }
                                982                 :                : 
                                983                 :                :         /* Append more events to existing list */
 2404 tgl@sss.pgh.pa.us         984                 :           1049 :         AddEventToPendingNotifies(n);
                                985                 :                :     }
                                986                 :                : 
 5871                           987                 :           1106 :     MemoryContextSwitchTo(oldcontext);
                                988                 :                : }
                                989                 :                : 
                                990                 :                : /*
                                991                 :                :  * queue_listen
                                992                 :                :  *      Common code for listen, unlisten, unlisten all commands.
                                993                 :                :  *
                                994                 :                :  *      Adds the request to the list of pending actions.
                                995                 :                :  *      Actual update of localChannelTable and globalChannelTable happens during
                                996                 :                :  *      PreCommit_Notify, with staged changes committed in AtCommit_Notify.
                                997                 :                :  */
                                998                 :                : static void
                                999                 :            110 : queue_listen(ListenActionKind action, const char *channel)
                               1000                 :                : {
                               1001                 :                :     MemoryContext oldcontext;
                               1002                 :                :     ListenAction *actrec;
 2354 rhaas@postgresql.org     1003                 :            110 :     int         my_level = GetCurrentTransactionNestLevel();
                               1004                 :                : 
                               1005                 :                :     /*
                               1006                 :                :      * Unlike Async_Notify, we don't try to collapse out duplicates here. We
                               1007                 :                :      * keep the ordered list to preserve interactions like UNLISTEN ALL; the
                               1008                 :                :      * final per-channel intent is computed during PreCommit_Notify.
                               1009                 :                :      */
 6577 tgl@sss.pgh.pa.us        1010                 :            110 :     oldcontext = MemoryContextSwitchTo(CurTransactionContext);
                               1011                 :                : 
                               1012                 :                :     /* space for terminating null is included in sizeof(ListenAction) */
 4040                          1013                 :            110 :     actrec = (ListenAction *) palloc(offsetof(ListenAction, channel) +
                               1014                 :            110 :                                      strlen(channel) + 1);
 6577                          1015                 :            110 :     actrec->action = action;
 5871                          1016                 :            110 :     strcpy(actrec->channel, channel);
                               1017                 :                : 
 2354 rhaas@postgresql.org     1018   [ +  +  +  + ]:            110 :     if (pendingActions == NULL || my_level > pendingActions->nestingLevel)
                               1019                 :             94 :     {
                               1020                 :                :         ActionList *actions;
                               1021                 :                : 
                               1022                 :                :         /*
                               1023                 :                :          * First action in current sub(xact). Note that we allocate the
                               1024                 :                :          * ActionList in TopTransactionContext; the nestingLevel might get
                               1025                 :                :          * changed later by AtSubCommit_Notify.
                               1026                 :                :          */
                               1027                 :                :         actions = (ActionList *)
                               1028                 :             94 :             MemoryContextAlloc(TopTransactionContext, sizeof(ActionList));
                               1029                 :             94 :         actions->nestingLevel = my_level;
                               1030                 :             94 :         actions->actions = list_make1(actrec);
                               1031                 :             94 :         actions->upper = pendingActions;
                               1032                 :             94 :         pendingActions = actions;
                               1033                 :                :     }
                               1034                 :                :     else
                               1035                 :             16 :         pendingActions->actions = lappend(pendingActions->actions, actrec);
                               1036                 :                : 
 6577 tgl@sss.pgh.pa.us        1037                 :            110 :     MemoryContextSwitchTo(oldcontext);
                               1038                 :            110 : }
                               1039                 :                : 
                               1040                 :                : /*
                               1041                 :                :  * Async_Listen
                               1042                 :                :  *
                               1043                 :                :  *      This is executed by the SQL listen command.
                               1044                 :                :  */
                               1045                 :                : void
 5871                          1046                 :             58 : Async_Listen(const char *channel)
                               1047                 :                : {
 6577                          1048         [ -  + ]:             58 :     if (Trace_notify)
 5871 tgl@sss.pgh.pa.us        1049         [ #  # ]:UBC           0 :         elog(DEBUG1, "Async_Listen(%s,%d)", channel, MyProcPid);
                               1050                 :                : 
 5871 tgl@sss.pgh.pa.us        1051                 :CBC          58 :     queue_listen(LISTEN_LISTEN, channel);
 6577                          1052                 :             58 : }
                               1053                 :                : 
                               1054                 :                : /*
                               1055                 :                :  * Async_Unlisten
                               1056                 :                :  *
                               1057                 :                :  *      This is executed by the SQL unlisten command.
                               1058                 :                :  */
                               1059                 :                : void
 5871                          1060                 :              3 : Async_Unlisten(const char *channel)
                               1061                 :                : {
 6406                          1062         [ -  + ]:              3 :     if (Trace_notify)
 5871 tgl@sss.pgh.pa.us        1063         [ #  # ]:UBC           0 :         elog(DEBUG1, "Async_Unlisten(%s,%d)", channel, MyProcPid);
                               1064                 :                : 
                               1065                 :                :     /* If we couldn't possibly be listening, no need to queue anything */
 2354 rhaas@postgresql.org     1066   [ +  -  -  + ]:CBC           3 :     if (pendingActions == NULL && !unlistenExitRegistered)
 6239 tgl@sss.pgh.pa.us        1067                 :UBC           0 :         return;
                               1068                 :                : 
 5871 tgl@sss.pgh.pa.us        1069                 :CBC           3 :     queue_listen(LISTEN_UNLISTEN, channel);
                               1070                 :                : }
                               1071                 :                : 
                               1072                 :                : /*
                               1073                 :                :  * Async_UnlistenAll
                               1074                 :                :  *
                               1075                 :                :  *      This is invoked by UNLISTEN * command, and also at backend exit.
                               1076                 :                :  */
                               1077                 :                : void
 6577                          1078                 :             78 : Async_UnlistenAll(void)
                               1079                 :                : {
                               1080         [ -  + ]:             78 :     if (Trace_notify)
 6577 tgl@sss.pgh.pa.us        1081         [ #  # ]:UBC           0 :         elog(DEBUG1, "Async_UnlistenAll(%d)", MyProcPid);
                               1082                 :                : 
                               1083                 :                :     /* If we couldn't possibly be listening, no need to queue anything */
 2354 rhaas@postgresql.org     1084   [ +  +  +  + ]:CBC          78 :     if (pendingActions == NULL && !unlistenExitRegistered)
 6239 tgl@sss.pgh.pa.us        1085                 :             29 :         return;
                               1086                 :                : 
 6577                          1087                 :             49 :     queue_listen(LISTEN_UNLISTEN_ALL, "");
                               1088                 :                : }
                               1089                 :                : 
                               1090                 :                : /*
                               1091                 :                :  * SQL function: return a set of the channel names this backend is actively
                               1092                 :                :  * listening to.
                               1093                 :                :  *
                               1094                 :                :  * Note: this coding relies on the fact that the localChannelTable cannot
                               1095                 :                :  * change within a transaction.
                               1096                 :                :  */
                               1097                 :                : Datum
 5871                          1098                 :              9 : pg_listening_channels(PG_FUNCTION_ARGS)
                               1099                 :                : {
                               1100                 :                :     FuncCallContext *funcctx;
                               1101                 :                :     HASH_SEQ_STATUS *status;
                               1102                 :                : 
                               1103                 :                :     /* stuff done only on the first call of the function */
                               1104         [ +  + ]:              9 :     if (SRF_IS_FIRSTCALL())
                               1105                 :                :     {
                               1106                 :                :         /* create a function context for cross-call persistence */
                               1107                 :              6 :         funcctx = SRF_FIRSTCALL_INIT();
                               1108                 :                : 
                               1109                 :                :         /* Initialize hash table iteration if we have any channels */
   59 tgl@sss.pgh.pa.us        1110         [ +  - ]:GNC           6 :         if (localChannelTable != NULL)
                               1111                 :                :         {
                               1112                 :                :             MemoryContext oldcontext;
                               1113                 :                : 
                               1114                 :              6 :             oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
                               1115                 :              6 :             status = (HASH_SEQ_STATUS *) palloc(sizeof(HASH_SEQ_STATUS));
                               1116                 :              6 :             hash_seq_init(status, localChannelTable);
                               1117                 :              6 :             funcctx->user_fctx = status;
                               1118                 :              6 :             MemoryContextSwitchTo(oldcontext);
                               1119                 :                :         }
                               1120                 :                :         else
                               1121                 :                :         {
   59 tgl@sss.pgh.pa.us        1122                 :UNC           0 :             funcctx->user_fctx = NULL;
                               1123                 :                :         }
                               1124                 :                :     }
                               1125                 :                : 
                               1126                 :                :     /* stuff done on every call of the function */
 5871 tgl@sss.pgh.pa.us        1127                 :CBC           9 :     funcctx = SRF_PERCALL_SETUP();
   59 tgl@sss.pgh.pa.us        1128                 :GNC           9 :     status = (HASH_SEQ_STATUS *) funcctx->user_fctx;
                               1129                 :                : 
                               1130         [ +  - ]:              9 :     if (status != NULL)
                               1131                 :                :     {
                               1132                 :                :         ChannelName *entry;
                               1133                 :                : 
                               1134                 :              9 :         entry = (ChannelName *) hash_seq_search(status);
                               1135         [ +  + ]:              9 :         if (entry != NULL)
                               1136                 :              3 :             SRF_RETURN_NEXT(funcctx, CStringGetTextDatum(entry->channel));
                               1137                 :                :     }
                               1138                 :                : 
 5871 tgl@sss.pgh.pa.us        1139                 :CBC           6 :     SRF_RETURN_DONE(funcctx);
                               1140                 :                : }
                               1141                 :                : 
                               1142                 :                : /*
                               1143                 :                :  * Async_UnlistenOnExit
                               1144                 :                :  *
                               1145                 :                :  * This is executed at backend exit if we have done any LISTENs in this
                               1146                 :                :  * backend.  It might not be necessary anymore, if the user UNLISTENed
                               1147                 :                :  * everything, but we don't try to detect that case.
                               1148                 :                :  */
                               1149                 :                : static void
 6577                          1150                 :             17 : Async_UnlistenOnExit(int code, Datum arg)
                               1151                 :                : {
   59 tgl@sss.pgh.pa.us        1152                 :GNC          17 :     CleanupListenersOnExit();
 4778 tgl@sss.pgh.pa.us        1153                 :CBC          17 :     asyncQueueUnregister();
 6577                          1154                 :             17 : }
                               1155                 :                : 
                               1156                 :                : /*
                               1157                 :                :  * AtPrepare_Notify
                               1158                 :                :  *
                               1159                 :                :  *      This is called at the prepare phase of a two-phase
                               1160                 :                :  *      transaction.  Save the state for possible commit later.
                               1161                 :                :  */
                               1162                 :                : void
                               1163                 :            315 : AtPrepare_Notify(void)
                               1164                 :                : {
                               1165                 :                :     /* It's not allowed to have any pending LISTEN/UNLISTEN/NOTIFY actions */
 5871                          1166   [ +  -  -  + ]:            315 :     if (pendingActions || pendingNotifies)
 6577 tgl@sss.pgh.pa.us        1167         [ #  # ]:UBC           0 :         ereport(ERROR,
                               1168                 :                :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                               1169                 :                :                  errmsg("cannot PREPARE a transaction that has executed LISTEN, UNLISTEN, or NOTIFY")));
 5871 tgl@sss.pgh.pa.us        1170                 :CBC         315 : }
                               1171                 :                : 
                               1172                 :                : /*
                               1173                 :                :  * PreCommit_Notify
                               1174                 :                :  *
                               1175                 :                :  *      This is called at transaction commit, before actually committing to
                               1176                 :                :  *      clog.
                               1177                 :                :  *
                               1178                 :                :  *      If there are pending LISTEN actions, make sure we are listed in the
                               1179                 :                :  *      shared-memory listener array.  This must happen before commit to
                               1180                 :                :  *      ensure we don't miss any notifies from transactions that commit
                               1181                 :                :  *      just after ours.
                               1182                 :                :  *
                               1183                 :                :  *      If there are outbound notify requests in the pendingNotifies list,
                               1184                 :                :  *      add them to the global queue.  We do that before commit so that
                               1185                 :                :  *      we can still throw error if we run out of queue space.
                               1186                 :                :  */
                               1187                 :                : void
                               1188                 :         310613 : PreCommit_Notify(void)
                               1189                 :                : {
                               1190                 :                :     ListCell   *p;
                               1191                 :                : 
 2404                          1192   [ +  +  +  + ]:         310613 :     if (!pendingActions && !pendingNotifies)
 5871                          1193                 :         310467 :         return;                 /* no relevant statements in this xact */
                               1194                 :                : 
                               1195         [ -  + ]:            146 :     if (Trace_notify)
 5871 tgl@sss.pgh.pa.us        1196         [ #  # ]:UBC           0 :         elog(DEBUG1, "PreCommit_Notify");
                               1197                 :                : 
                               1198                 :                :     /* Preflight for any pending listen/unlisten actions */
   59 tgl@sss.pgh.pa.us        1199                 :GNC         146 :     initGlobalChannelTable();
                               1200                 :                : 
 2354 rhaas@postgresql.org     1201         [ +  + ]:CBC         146 :     if (pendingActions != NULL)
                               1202                 :                :     {
                               1203                 :                :         /* Ensure we have a local channel table */
   59 tgl@sss.pgh.pa.us        1204                 :GNC          91 :         initLocalChannelTable();
                               1205                 :                :         /* Create pendingListenActions hash table for this transaction */
                               1206                 :             91 :         initPendingListenActions();
                               1207                 :                : 
                               1208                 :                :         /* Stage all the actions this transaction wants to perform */
 2354 rhaas@postgresql.org     1209   [ +  -  +  +  :CBC         198 :         foreach(p, pendingActions->actions)
                                              +  + ]
                               1210                 :                :         {
                               1211                 :            107 :             ListenAction *actrec = (ListenAction *) lfirst(p);
                               1212                 :                : 
                               1213   [ +  +  +  - ]:            107 :             switch (actrec->action)
                               1214                 :                :             {
                               1215                 :             56 :                 case LISTEN_LISTEN:
   59 tgl@sss.pgh.pa.us        1216                 :GNC          56 :                     BecomeRegisteredListener();
                               1217                 :             56 :                     PrepareTableEntriesForListen(actrec->channel);
 2354 rhaas@postgresql.org     1218                 :CBC          56 :                     break;
                               1219                 :              3 :                 case LISTEN_UNLISTEN:
   59 tgl@sss.pgh.pa.us        1220                 :GNC           3 :                     PrepareTableEntriesForUnlisten(actrec->channel);
 2354 rhaas@postgresql.org     1221                 :CBC           3 :                     break;
                               1222                 :             48 :                 case LISTEN_UNLISTEN_ALL:
   59 tgl@sss.pgh.pa.us        1223                 :GNC          48 :                     PrepareTableEntriesForUnlistenAll();
 2354 rhaas@postgresql.org     1224                 :CBC          48 :                     break;
                               1225                 :                :             }
                               1226                 :                :         }
                               1227                 :                :     }
                               1228                 :                : 
                               1229                 :                :     /* Queue any pending notifies (must happen after the above) */
 5871 tgl@sss.pgh.pa.us        1230         [ +  + ]:            146 :     if (pendingNotifies)
                               1231                 :                :     {
                               1232                 :                :         ListCell   *nextNotify;
   59 tgl@sss.pgh.pa.us        1233                 :GNC          55 :         bool        firstIteration = true;
                               1234                 :                : 
                               1235                 :                :         /*
                               1236                 :                :          * Build list of unique channel names being notified for use by
                               1237                 :                :          * SignalBackends().
                               1238                 :                :          *
                               1239                 :                :          * If uniqueChannelHash is available, use it to efficiently get the
                               1240                 :                :          * unique channels.  Otherwise, fall back to the O(N^2) approach.
                               1241                 :                :          */
                               1242                 :             55 :         pendingNotifies->uniqueChannelNames = NIL;
                               1243         [ +  + ]:             55 :         if (pendingNotifies->uniqueChannelHash != NULL)
                               1244                 :                :         {
                               1245                 :                :             HASH_SEQ_STATUS status;
                               1246                 :                :             ChannelName *channelEntry;
                               1247                 :                : 
                               1248                 :              2 :             hash_seq_init(&status, pendingNotifies->uniqueChannelHash);
                               1249         [ +  + ]:              4 :             while ((channelEntry = (ChannelName *) hash_seq_search(&status)) != NULL)
                               1250                 :              2 :                 pendingNotifies->uniqueChannelNames =
                               1251                 :              2 :                     lappend(pendingNotifies->uniqueChannelNames,
                               1252                 :              2 :                             channelEntry->channel);
                               1253                 :                :         }
                               1254                 :                :         else
                               1255                 :                :         {
                               1256                 :                :             /* O(N^2) approach is better for small number of notifications */
                               1257   [ +  -  +  +  :            189 :             foreach_ptr(Notification, n, pendingNotifies->events)
                                              +  + ]
                               1258                 :                :             {
                               1259                 :             83 :                 char       *channel = n->data;
                               1260                 :             83 :                 bool        found = false;
                               1261                 :                : 
                               1262                 :                :                 /* Name present in list? */
                               1263   [ +  +  +  +  :            170 :                 foreach_ptr(char, oldchan, pendingNotifies->uniqueChannelNames)
                                              +  + ]
                               1264                 :                :                 {
                               1265         [ +  + ]:             31 :                     if (strcmp(oldchan, channel) == 0)
                               1266                 :                :                     {
                               1267                 :             27 :                         found = true;
                               1268                 :             27 :                         break;
                               1269                 :                :                     }
                               1270                 :                :                 }
                               1271                 :                :                 /* Add if not already in list */
                               1272         [ +  + ]:             83 :                 if (!found)
                               1273                 :             56 :                     pendingNotifies->uniqueChannelNames =
                               1274                 :             56 :                         lappend(pendingNotifies->uniqueChannelNames,
                               1275                 :                :                                 channel);
                               1276                 :                :             }
                               1277                 :                :         }
                               1278                 :                : 
                               1279                 :                :         /* Preallocate workspace that will be needed by SignalBackends() */
                               1280         [ +  + ]:             55 :         if (signalPids == NULL)
                               1281                 :             12 :             signalPids = MemoryContextAlloc(TopMemoryContext,
                               1282                 :                :                                             MaxBackends * sizeof(int32));
                               1283                 :                : 
                               1284         [ +  + ]:             55 :         if (signalProcnos == NULL)
                               1285                 :             12 :             signalProcnos = MemoryContextAlloc(TopMemoryContext,
                               1286                 :                :                                                MaxBackends * sizeof(ProcNumber));
                               1287                 :                : 
                               1288                 :                :         /*
                               1289                 :                :          * Make sure that we have an XID assigned to the current transaction.
                               1290                 :                :          * GetCurrentTransactionId is cheap if we already have an XID, but not
                               1291                 :                :          * so cheap if we don't, and we'd prefer not to do that work while
                               1292                 :                :          * holding NotifyQueueLock.
                               1293                 :                :          */
 5871 tgl@sss.pgh.pa.us        1294                 :CBC          55 :         (void) GetCurrentTransactionId();
                               1295                 :                : 
                               1296                 :                :         /*
                               1297                 :                :          * Serialize writers by acquiring a special lock that we hold till
                               1298                 :                :          * after commit.  This ensures that queue entries appear in commit
                               1299                 :                :          * order, and in particular that there are never uncommitted queue
                               1300                 :                :          * entries ahead of committed ones, so an uncommitted transaction
                               1301                 :                :          * can't block delivery of deliverable notifications.
                               1302                 :                :          *
                               1303                 :                :          * We use a heavyweight lock so that it'll automatically be released
                               1304                 :                :          * after either commit or abort.  This also allows deadlocks to be
                               1305                 :                :          * detected, though really a deadlock shouldn't be possible here.
                               1306                 :                :          *
                               1307                 :                :          * The lock is on "database 0", which is pretty ugly but it doesn't
                               1308                 :                :          * seem worth inventing a special locktag category just for this.
                               1309                 :                :          * (Historical note: before PG 9.0, a similar lock on "database 0" was
                               1310                 :                :          * used by the flatfiles mechanism.)
                               1311                 :                :          */
                               1312                 :             55 :         LockSharedObject(DatabaseRelationId, InvalidOid, 0,
                               1313                 :                :                          AccessExclusiveLock);
                               1314                 :                : 
                               1315                 :                :         /*
                               1316                 :                :          * For the direct advancement optimization in SignalBackends(), we
                               1317                 :                :          * need to ensure that no other backend can insert queue entries
                               1318                 :                :          * between queueHeadBeforeWrite and queueHeadAfterWrite.  The
                               1319                 :                :          * heavyweight lock above provides this guarantee, since it serializes
                               1320                 :                :          * all writers.
                               1321                 :                :          *
                               1322                 :                :          * Note: if the heavyweight lock were ever removed for scalability
                               1323                 :                :          * reasons, we could achieve the same guarantee by holding
                               1324                 :                :          * NotifyQueueLock in EXCLUSIVE mode across all our insertions, rather
                               1325                 :                :          * than releasing and reacquiring it for each page as we do below.
                               1326                 :                :          */
                               1327                 :                : 
                               1328                 :                :         /* Initialize values to a safe default in case list is empty */
   59 tgl@sss.pgh.pa.us        1329                 :GNC          55 :         SET_QUEUE_POS(queueHeadBeforeWrite, 0, 0);
                               1330                 :             55 :         SET_QUEUE_POS(queueHeadAfterWrite, 0, 0);
                               1331                 :                : 
                               1332                 :                :         /* Now push the notifications into the queue */
 2404 tgl@sss.pgh.pa.us        1333                 :CBC          55 :         nextNotify = list_head(pendingNotifies->events);
 5871                          1334         [ +  + ]:            145 :         while (nextNotify != NULL)
                               1335                 :                :         {
                               1336                 :                :             /*
                               1337                 :                :              * Add the pending notifications to the queue.  We acquire and
                               1338                 :                :              * release NotifyQueueLock once per page, which might be overkill
                               1339                 :                :              * but it does allow readers to get in while we're doing this.
                               1340                 :                :              *
                               1341                 :                :              * A full queue is very uncommon and should really not happen,
                               1342                 :                :              * given that we have so much space available in the SLRU pages.
                               1343                 :                :              * Nevertheless we need to deal with this possibility. Note that
                               1344                 :                :              * when we get here we are in the process of committing our
                               1345                 :                :              * transaction, but we have not yet committed to clog, so at this
                               1346                 :                :              * point in time we can still roll the transaction back.
                               1347                 :                :              */
 2130                          1348                 :             90 :             LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
   59 tgl@sss.pgh.pa.us        1349         [ +  + ]:GNC          90 :             if (firstIteration)
                               1350                 :                :             {
                               1351                 :             55 :                 queueHeadBeforeWrite = QUEUE_HEAD;
                               1352                 :             55 :                 firstIteration = false;
                               1353                 :                :             }
 5871 tgl@sss.pgh.pa.us        1354                 :CBC          90 :             asyncQueueFillWarning();
                               1355         [ -  + ]:             90 :             if (asyncQueueIsFull())
 5871 tgl@sss.pgh.pa.us        1356         [ #  # ]:UBC           0 :                 ereport(ERROR,
                               1357                 :                :                         (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
                               1358                 :                :                          errmsg("too many notifications in the NOTIFY queue")));
 5871 tgl@sss.pgh.pa.us        1359                 :CBC          90 :             nextNotify = asyncQueueAddEntries(nextNotify);
   59 tgl@sss.pgh.pa.us        1360                 :GNC          90 :             queueHeadAfterWrite = QUEUE_HEAD;
 2130 tgl@sss.pgh.pa.us        1361                 :CBC          90 :             LWLockRelease(NotifyQueueLock);
                               1362                 :                :         }
                               1363                 :                : 
                               1364                 :                :         /* Note that we don't clear pendingNotifies; AtCommit_Notify will. */
                               1365                 :                :     }
                               1366                 :                : }
                               1367                 :                : 
                               1368                 :                : /*
                               1369                 :                :  * AtCommit_Notify
                               1370                 :                :  *
                               1371                 :                :  *      This is called at transaction commit, after committing to clog.
                               1372                 :                :  *
                               1373                 :                :  *      Apply pending listen/unlisten changes and clear transaction-local state.
                               1374                 :                :  *
                               1375                 :                :  *      If we issued any notifications in the transaction, send signals to
                               1376                 :                :  *      listening backends (possibly including ourselves) to process them.
                               1377                 :                :  *      Also, if we filled enough queue pages with new notifies, try to
                               1378                 :                :  *      advance the queue tail pointer.
                               1379                 :                :  */
                               1380                 :                : void
 6577                          1381                 :         310458 : AtCommit_Notify(void)
                               1382                 :                : {
                               1383                 :                :     /*
                               1384                 :                :      * Allow transactions that have not executed LISTEN/UNLISTEN/NOTIFY to
                               1385                 :                :      * return as soon as possible
                               1386                 :                :      */
 5871                          1387   [ +  +  +  + ]:         310458 :     if (!pendingActions && !pendingNotifies)
 6577                          1388                 :         310312 :         return;
                               1389                 :                : 
                               1390         [ -  + ]:            146 :     if (Trace_notify)
 6577 tgl@sss.pgh.pa.us        1391         [ #  # ]:UBC           0 :         elog(DEBUG1, "AtCommit_Notify");
                               1392                 :                : 
                               1393                 :                :     /* Apply staged listen/unlisten changes */
   59 tgl@sss.pgh.pa.us        1394                 :GNC         146 :     ApplyPendingListenActions(true);
                               1395                 :                : 
                               1396                 :                :     /* If no longer listening to anything, get out of listener array */
                               1397   [ +  +  +  -  :            146 :     if (amRegisteredListener && LocalChannelTableIsEmpty())
                                              +  + ]
 4778 tgl@sss.pgh.pa.us        1398                 :CBC          23 :         asyncQueueUnregister();
                               1399                 :                : 
                               1400                 :                :     /*
                               1401                 :                :      * Send signals to listening backends.  We need do this only if there are
                               1402                 :                :      * pending notifies, which were previously added to the shared queue by
                               1403                 :                :      * PreCommit_Notify().
                               1404                 :                :      */
 1643                          1405         [ +  + ]:            146 :     if (pendingNotifies != NULL)
                               1406                 :             55 :         SignalBackends();
                               1407                 :                : 
                               1408                 :                :     /*
                               1409                 :                :      * If it's time to try to advance the global tail pointer, do that.
                               1410                 :                :      *
                               1411                 :                :      * (It might seem odd to do this in the sender, when more than likely the
                               1412                 :                :      * listeners won't yet have read the messages we just sent.  However,
                               1413                 :                :      * there's less contention if only the sender does it, and there is little
                               1414                 :                :      * need for urgency in advancing the global tail.  So this typically will
                               1415                 :                :      * be clearing out messages that were sent some time ago.)
                               1416                 :                :      */
                               1417         [ +  + ]:            146 :     if (tryAdvanceTail)
                               1418                 :                :     {
                               1419                 :              8 :         tryAdvanceTail = false;
                               1420                 :              8 :         asyncQueueAdvanceTail();
                               1421                 :                :     }
                               1422                 :                : 
                               1423                 :                :     /* And clean up */
 5871                          1424                 :            146 :     ClearPendingActionsAndNotifies();
                               1425                 :                : }
                               1426                 :                : 
                               1427                 :                : /*
                               1428                 :                :  * BecomeRegisteredListener --- subroutine for PreCommit_Notify
                               1429                 :                :  *
                               1430                 :                :  * This function must make sure we are ready to catch any incoming messages.
                               1431                 :                :  */
                               1432                 :                : static void
   59 tgl@sss.pgh.pa.us        1433                 :GNC          56 : BecomeRegisteredListener(void)
                               1434                 :                : {
                               1435                 :                :     QueuePosition head;
                               1436                 :                :     QueuePosition max;
                               1437                 :                :     ProcNumber  prevListener;
                               1438                 :                : 
                               1439                 :                :     /*
                               1440                 :                :      * Nothing to do if we are already listening to something, nor if we
                               1441                 :                :      * already ran this routine in this transaction.
                               1442                 :                :      */
 4778 tgl@sss.pgh.pa.us        1443         [ +  + ]:CBC          56 :     if (amRegisteredListener)
 5871                          1444                 :             27 :         return;
                               1445                 :                : 
                               1446         [ -  + ]:             29 :     if (Trace_notify)
   59 tgl@sss.pgh.pa.us        1447         [ #  # ]:UNC           0 :         elog(DEBUG1, "BecomeRegisteredListener(%d)", MyProcPid);
                               1448                 :                : 
                               1449                 :                :     /*
                               1450                 :                :      * Before registering, make sure we will unlisten before dying. (Note:
                               1451                 :                :      * this action does not get undone if we abort later.)
                               1452                 :                :      */
 5871 tgl@sss.pgh.pa.us        1453         [ +  + ]:CBC          29 :     if (!unlistenExitRegistered)
                               1454                 :                :     {
 4470 rhaas@postgresql.org     1455                 :             17 :         before_shmem_exit(Async_UnlistenOnExit, 0);
 5871 tgl@sss.pgh.pa.us        1456                 :             17 :         unlistenExitRegistered = true;
                               1457                 :                :     }
                               1458                 :                : 
                               1459                 :                :     /*
                               1460                 :                :      * This is our first LISTEN, so establish our pointer.
                               1461                 :                :      *
                               1462                 :                :      * We set our pointer to the global tail pointer and then move it forward
                               1463                 :                :      * over already-committed notifications.  This ensures we cannot miss any
                               1464                 :                :      * not-yet-committed notifications.  We might get a few more but that
                               1465                 :                :      * doesn't hurt.
                               1466                 :                :      *
                               1467                 :                :      * In some scenarios there might be a lot of committed notifications that
                               1468                 :                :      * have not yet been pruned away (because some backend is being lazy about
                               1469                 :                :      * reading them).  To reduce our startup time, we can look at other
                               1470                 :                :      * backends and adopt the maximum "pos" pointer of any backend that's in
                               1471                 :                :      * our database; any notifications it's already advanced over are surely
                               1472                 :                :      * committed and need not be re-examined by us.  (We must consider only
                               1473                 :                :      * backends connected to our DB, because others will not have bothered to
                               1474                 :                :      * check committed-ness of notifications in our DB.)
                               1475                 :                :      *
                               1476                 :                :      * We need exclusive lock here so we can look at other backends' entries
                               1477                 :                :      * and manipulate the list links.
                               1478                 :                :      */
 2130                          1479                 :             29 :     LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
 3819                          1480                 :             29 :     head = QUEUE_HEAD;
                               1481                 :             29 :     max = QUEUE_TAIL;
  742 heikki.linnakangas@i     1482                 :             29 :     prevListener = INVALID_PROC_NUMBER;
                               1483         [ +  + ]:             44 :     for (ProcNumber i = QUEUE_FIRST_LISTENER; i != INVALID_PROC_NUMBER; i = QUEUE_NEXT_LISTENER(i))
                               1484                 :                :     {
 2378 tgl@sss.pgh.pa.us        1485         [ +  - ]:             15 :         if (QUEUE_BACKEND_DBOID(i) == MyDatabaseId)
                               1486   [ -  +  +  -  :             15 :             max = QUEUE_POS_MAX(max, QUEUE_BACKEND_POS(i));
                                              +  - ]
                               1487                 :                :         /* Also find last listening backend before this one */
  742 heikki.linnakangas@i     1488         [ +  + ]:             15 :         if (i < MyProcNumber)
 2378 tgl@sss.pgh.pa.us        1489                 :              8 :             prevListener = i;
                               1490                 :                :     }
  742 heikki.linnakangas@i     1491                 :             29 :     QUEUE_BACKEND_POS(MyProcNumber) = max;
                               1492                 :             29 :     QUEUE_BACKEND_PID(MyProcNumber) = MyProcPid;
                               1493                 :             29 :     QUEUE_BACKEND_DBOID(MyProcNumber) = MyDatabaseId;
   59 tgl@sss.pgh.pa.us        1494                 :GNC          29 :     QUEUE_BACKEND_WAKEUP_PENDING(MyProcNumber) = false;
                               1495                 :             29 :     QUEUE_BACKEND_IS_ADVANCING(MyProcNumber) = false;
                               1496                 :                :     /* Insert backend into list of listeners at correct position */
  742 heikki.linnakangas@i     1497         [ +  + ]:CBC          29 :     if (prevListener != INVALID_PROC_NUMBER)
                               1498                 :                :     {
                               1499                 :              4 :         QUEUE_NEXT_LISTENER(MyProcNumber) = QUEUE_NEXT_LISTENER(prevListener);
                               1500                 :              4 :         QUEUE_NEXT_LISTENER(prevListener) = MyProcNumber;
                               1501                 :                :     }
                               1502                 :                :     else
                               1503                 :                :     {
                               1504                 :             25 :         QUEUE_NEXT_LISTENER(MyProcNumber) = QUEUE_FIRST_LISTENER;
                               1505                 :             25 :         QUEUE_FIRST_LISTENER = MyProcNumber;
                               1506                 :                :     }
 2130 tgl@sss.pgh.pa.us        1507                 :             29 :     LWLockRelease(NotifyQueueLock);
                               1508                 :                : 
                               1509                 :                :     /* Now we are listed in the global array, so remember we're listening */
 4778                          1510                 :             29 :     amRegisteredListener = true;
                               1511                 :                : 
                               1512                 :                :     /*
                               1513                 :                :      * Try to move our pointer forward as far as possible.  This will skip
                               1514                 :                :      * over already-committed notifications, which we want to do because they
                               1515                 :                :      * might be quite stale.  Note that we are not yet listening on anything,
                               1516                 :                :      * so we won't deliver such notifications to our frontend.  Also, although
                               1517                 :                :      * our transaction might have executed NOTIFY, those message(s) aren't
                               1518                 :                :      * queued yet so we won't skip them here.
                               1519                 :                :      */
 3819                          1520   [ +  +  +  + ]:             29 :     if (!QUEUE_POS_EQUAL(max, head))
                               1521                 :             15 :         asyncQueueReadAllNotifications();
                               1522                 :                : }
                               1523                 :                : 
                               1524                 :                : /*
                               1525                 :                :  * PrepareTableEntriesForListen --- subroutine for PreCommit_Notify
                               1526                 :                :  *
                               1527                 :                :  * Prepare a LISTEN by recording it in pendingListenActions, pre-allocating
                               1528                 :                :  * an entry in localChannelTable, and pre-allocating an entry in the shared
                               1529                 :                :  * globalChannelTable with listening=false.  The listening flag will be set
                               1530                 :                :  * to true in AtCommit_Notify.  If we abort later, unwanted table entries
                               1531                 :                :  * will be removed.
                               1532                 :                :  */
                               1533                 :                : static void
   59 tgl@sss.pgh.pa.us        1534                 :GNC          56 : PrepareTableEntriesForListen(const char *channel)
                               1535                 :                : {
                               1536                 :                :     GlobalChannelKey key;
                               1537                 :                :     GlobalChannelEntry *entry;
                               1538                 :                :     bool        found;
                               1539                 :                :     ListenerEntry *listeners;
                               1540                 :                :     PendingListenEntry *pending;
                               1541                 :                : 
                               1542                 :                :     /*
                               1543                 :                :      * Record in local pending hash that we want to LISTEN, overwriting any
                               1544                 :                :      * earlier attempt to UNLISTEN.
                               1545                 :                :      */
                               1546                 :                :     pending = (PendingListenEntry *)
                               1547                 :             56 :         hash_search(pendingListenActions, channel, HASH_ENTER, NULL);
                               1548                 :             56 :     pending->action = PENDING_LISTEN;
                               1549                 :                : 
                               1550                 :                :     /*
                               1551                 :                :      * Ensure that there is an entry for the channel in localChannelTable.
                               1552                 :                :      * (Should this fail, we can just roll back.)  If the transaction fails
                               1553                 :                :      * after this point, we will remove the entry if appropriate during
                               1554                 :                :      * ApplyPendingListenActions.  Note that this entry allows IsListeningOn()
                               1555                 :                :      * to return TRUE; we assume nothing is going to consult that before
                               1556                 :                :      * AtCommit_Notify/AtAbort_Notify.  However, if later actions attempt to
                               1557                 :                :      * UNLISTEN this channel or UNLISTEN *, we need to have the local entry
                               1558                 :                :      * present to ensure they do the right things; see
                               1559                 :                :      * PrepareTableEntriesForUnlisten and PrepareTableEntriesForUnlistenAll.
                               1560                 :                :      */
                               1561                 :             56 :     (void) hash_search(localChannelTable, channel, HASH_ENTER, NULL);
                               1562                 :                : 
                               1563                 :                :     /* Pre-allocate entry in shared globalChannelTable with listening=false */
                               1564                 :             56 :     GlobalChannelKeyInit(&key, MyDatabaseId, channel);
                               1565                 :             56 :     entry = dshash_find_or_insert(globalChannelTable, &key, &found);
                               1566                 :                : 
                               1567         [ +  + ]:             56 :     if (!found)
                               1568                 :                :     {
                               1569                 :                :         /* New channel entry, so initialize it to a safe state */
                               1570                 :             34 :         entry->listenersArray = InvalidDsaPointer;
                               1571                 :             34 :         entry->numListeners = 0;
                               1572                 :             34 :         entry->allocatedListeners = 0;
                               1573                 :                :     }
                               1574                 :                : 
                               1575                 :                :     /*
                               1576                 :                :      * Create listenersArray if entry doesn't have one.  It's tempting to fold
                               1577                 :                :      * this into the !found case, but this coding allows us to cope in case
                               1578                 :                :      * dsa_allocate() failed in an earlier attempt.
                               1579                 :                :      */
                               1580         [ +  + ]:             56 :     if (!DsaPointerIsValid(entry->listenersArray))
                               1581                 :                :     {
                               1582                 :             34 :         entry->listenersArray = dsa_allocate(globalChannelDSA,
                               1583                 :                :                                              sizeof(ListenerEntry) * INITIAL_LISTENERS_ARRAY_SIZE);
                               1584                 :             34 :         entry->allocatedListeners = INITIAL_LISTENERS_ARRAY_SIZE;
                               1585                 :                :     }
                               1586                 :                : 
                               1587                 :                :     listeners = (ListenerEntry *)
                               1588                 :             56 :         dsa_get_address(globalChannelDSA, entry->listenersArray);
                               1589                 :                : 
                               1590                 :                :     /*
                               1591                 :                :      * Check if we already have a ListenerEntry (possibly from earlier in this
                               1592                 :                :      * transaction)
                               1593                 :                :      */
                               1594         [ +  + ]:             75 :     for (int i = 0; i < entry->numListeners; i++)
                               1595                 :                :     {
                               1596         [ +  + ]:             30 :         if (listeners[i].procNo == MyProcNumber)
                               1597                 :                :         {
                               1598                 :                :             /* Already have an entry; listening flag stays as-is until commit */
                               1599                 :             11 :             dshash_release_lock(globalChannelTable, entry);
                               1600                 :             11 :             return;
                               1601                 :                :         }
                               1602                 :                :     }
                               1603                 :                : 
                               1604                 :                :     /* Need to add a new entry; grow array if necessary */
                               1605         [ +  + ]:             45 :     if (entry->numListeners >= entry->allocatedListeners)
                               1606                 :                :     {
                               1607                 :              1 :         int         new_size = entry->allocatedListeners * 2;
                               1608                 :              1 :         dsa_pointer old_array = entry->listenersArray;
                               1609                 :              1 :         dsa_pointer new_array = dsa_allocate(globalChannelDSA,
                               1610                 :                :                                              sizeof(ListenerEntry) * new_size);
                               1611                 :              1 :         ListenerEntry *new_listeners = (ListenerEntry *) dsa_get_address(globalChannelDSA, new_array);
                               1612                 :                : 
                               1613                 :              1 :         memcpy(new_listeners, listeners, sizeof(ListenerEntry) * entry->numListeners);
                               1614                 :              1 :         entry->listenersArray = new_array;
                               1615                 :              1 :         entry->allocatedListeners = new_size;
                               1616                 :              1 :         dsa_free(globalChannelDSA, old_array);
                               1617                 :              1 :         listeners = new_listeners;
                               1618                 :                :     }
                               1619                 :                : 
                               1620                 :             45 :     listeners[entry->numListeners].procNo = MyProcNumber;
                               1621                 :             45 :     listeners[entry->numListeners].listening = false;    /* staged, not yet
                               1622                 :                :                                                          * committed */
                               1623                 :             45 :     entry->numListeners++;
                               1624                 :                : 
                               1625                 :             45 :     dshash_release_lock(globalChannelTable, entry);
                               1626                 :                : }
                               1627                 :                : 
                               1628                 :                : /*
                               1629                 :                :  * PrepareTableEntriesForUnlisten --- subroutine for PreCommit_Notify
                               1630                 :                :  *
                               1631                 :                :  * Prepare an UNLISTEN by recording it in pendingListenActions, but only if
                               1632                 :                :  * we're currently listening (committed or staged).  We don't touch
                               1633                 :                :  * globalChannelTable yet - the listener keeps receiving signals until
                               1634                 :                :  * commit, when the entry is removed.
                               1635                 :                :  */
                               1636                 :                : static void
                               1637                 :              3 : PrepareTableEntriesForUnlisten(const char *channel)
                               1638                 :                : {
                               1639                 :                :     PendingListenEntry *pending;
                               1640                 :                : 
                               1641                 :                :     /*
                               1642                 :                :      * If the channel name is not in localChannelTable, then we are neither
                               1643                 :                :      * listening on it nor preparing to listen on it, so we don't need to
                               1644                 :                :      * record an UNLISTEN action.
                               1645                 :                :      */
                               1646         [ -  + ]:              3 :     Assert(localChannelTable != NULL);
                               1647         [ -  + ]:              3 :     if (hash_search(localChannelTable, channel, HASH_FIND, NULL) == NULL)
 5871 tgl@sss.pgh.pa.us        1648                 :LBC        (10) :         return;
                               1649                 :                : 
                               1650                 :                :     /*
                               1651                 :                :      * Record in local pending hash that we want to UNLISTEN, overwriting any
                               1652                 :                :      * earlier attempt to LISTEN.  Don't touch localChannelTable or
                               1653                 :                :      * globalChannelTable yet - we keep receiving signals until commit.
                               1654                 :                :      */
                               1655                 :                :     pending = (PendingListenEntry *)
   59 tgl@sss.pgh.pa.us        1656                 :GNC           3 :         hash_search(pendingListenActions, channel, HASH_ENTER, NULL);
                               1657                 :              3 :     pending->action = PENDING_UNLISTEN;
                               1658                 :                : }
                               1659                 :                : 
                               1660                 :                : /*
                               1661                 :                :  * PrepareTableEntriesForUnlistenAll --- subroutine for PreCommit_Notify
                               1662                 :                :  *
                               1663                 :                :  * Prepare UNLISTEN * by recording an UNLISTEN for all listened or
                               1664                 :                :  * about-to-be-listened channels in pendingListenActions.
                               1665                 :                :  */
                               1666                 :                : static void
                               1667                 :             48 : PrepareTableEntriesForUnlistenAll(void)
                               1668                 :                : {
                               1669                 :                :     HASH_SEQ_STATUS seq;
                               1670                 :                :     ChannelName *channelEntry;
                               1671                 :                :     PendingListenEntry *pending;
                               1672                 :                : 
                               1673                 :                :     /*
                               1674                 :                :      * Scan localChannelTable, which will have the names of all channels that
                               1675                 :                :      * we are listening on or have prepared to listen on.  Record an UNLISTEN
                               1676                 :                :      * action for each one, overwriting any earlier attempt to LISTEN.
                               1677                 :                :      */
                               1678                 :             48 :     hash_seq_init(&seq, localChannelTable);
                               1679         [ +  + ]:             82 :     while ((channelEntry = (ChannelName *) hash_seq_search(&seq)) != NULL)
                               1680                 :                :     {
                               1681                 :                :         pending = (PendingListenEntry *)
                               1682                 :             34 :             hash_search(pendingListenActions, channelEntry->channel, HASH_ENTER, NULL);
                               1683                 :             34 :         pending->action = PENDING_UNLISTEN;
                               1684                 :                :     }
                               1685                 :             48 : }
                               1686                 :                : 
                               1687                 :                : /*
                               1688                 :                :  * RemoveListenerFromChannel --- remove idx'th listener from global channel entry
                               1689                 :                :  *
                               1690                 :                :  * Decrements numListeners, compacts the array, and frees the entry if empty.
                               1691                 :                :  * Sets *entry_ptr to NULL if the entry was deleted.
                               1692                 :                :  *
                               1693                 :                :  * We could get the listeners pointer from the entry, but all callers
                               1694                 :                :  * already have it at hand.
                               1695                 :                :  */
                               1696                 :                : static void
                               1697                 :             37 : RemoveListenerFromChannel(GlobalChannelEntry **entry_ptr,
                               1698                 :                :                           ListenerEntry *listeners,
                               1699                 :                :                           int idx)
                               1700                 :                : {
                               1701                 :             37 :     GlobalChannelEntry *entry = *entry_ptr;
                               1702                 :                : 
                               1703                 :             37 :     entry->numListeners--;
                               1704         [ +  + ]:             37 :     if (idx < entry->numListeners)
                               1705                 :              8 :         memmove(&listeners[idx], &listeners[idx + 1],
                               1706                 :              8 :                 sizeof(ListenerEntry) * (entry->numListeners - idx));
                               1707                 :                : 
                               1708         [ +  + ]:             37 :     if (entry->numListeners == 0)
                               1709                 :                :     {
                               1710                 :             26 :         dsa_free(globalChannelDSA, entry->listenersArray);
                               1711                 :             26 :         dshash_delete_entry(globalChannelTable, entry);
                               1712                 :                :         /* tells caller not to release the entry's lock: */
                               1713                 :             26 :         *entry_ptr = NULL;
                               1714                 :                :     }
                               1715                 :             37 : }
                               1716                 :                : 
                               1717                 :                : /*
                               1718                 :                :  * ApplyPendingListenActions
                               1719                 :                :  *
                               1720                 :                :  * Apply, or revert, staged listen/unlisten changes to the local and global
                               1721                 :                :  * hash tables.
                               1722                 :                :  */
                               1723                 :                : static void
                               1724                 :          26888 : ApplyPendingListenActions(bool isCommit)
                               1725                 :                : {
                               1726                 :                :     HASH_SEQ_STATUS seq;
                               1727                 :                :     PendingListenEntry *pending;
                               1728                 :                : 
                               1729                 :                :     /* Quick exit if nothing to do */
                               1730         [ +  + ]:          26888 :     if (pendingListenActions == NULL)
                               1731                 :          26797 :         return;
                               1732                 :                : 
                               1733                 :                :     /* We made a globalChannelTable before building pendingListenActions */
                               1734         [ -  + ]:             91 :     if (globalChannelTable == NULL)
   59 tgl@sss.pgh.pa.us        1735         [ #  # ]:UNC           0 :         elog(PANIC, "global channel table missing post-commit/abort");
                               1736                 :                : 
                               1737                 :                :     /* For each staged action ... */
   59 tgl@sss.pgh.pa.us        1738                 :GNC          91 :     hash_seq_init(&seq, pendingListenActions);
                               1739         [ +  + ]:            183 :     while ((pending = (PendingListenEntry *) hash_seq_search(&seq)) != NULL)
                               1740                 :                :     {
                               1741                 :                :         GlobalChannelKey key;
                               1742                 :                :         GlobalChannelEntry *entry;
                               1743                 :             92 :         bool        removeLocal = true;
                               1744                 :             92 :         bool        foundListener = false;
                               1745                 :                : 
                               1746                 :                :         /*
                               1747                 :                :          * Find the global entry for this channel.  If isCommit, it had better
                               1748                 :                :          * exist (it was created in PreCommit).  In an abort, it might not
                               1749                 :                :          * exist, in which case we are not listening and should discard any
                               1750                 :                :          * local entry that PreCommit may have managed to create.
                               1751                 :                :          */
                               1752                 :             92 :         GlobalChannelKeyInit(&key, MyDatabaseId, pending->channel);
                               1753                 :             92 :         entry = dshash_find(globalChannelTable, &key, true);
                               1754         [ +  - ]:             92 :         if (entry != NULL)
                               1755                 :                :         {
                               1756                 :                :             /* Scan entry to find the ListenerEntry for this backend */
                               1757                 :                :             ListenerEntry *listeners;
                               1758                 :                : 
                               1759                 :                :             listeners = (ListenerEntry *)
                               1760                 :             92 :                 dsa_get_address(globalChannelDSA, entry->listenersArray);
                               1761                 :                : 
                               1762         [ +  - ]:            116 :             for (int i = 0; i < entry->numListeners; i++)
                               1763                 :                :             {
                               1764         [ +  + ]:            116 :                 if (listeners[i].procNo != MyProcNumber)
                               1765                 :             24 :                     continue;
                               1766                 :             92 :                 foundListener = true;
                               1767         [ +  - ]:             92 :                 if (isCommit)
                               1768                 :                :                 {
                               1769         [ +  + ]:             92 :                     if (pending->action == PENDING_LISTEN)
                               1770                 :                :                     {
                               1771                 :                :                         /*
                               1772                 :                :                          * LISTEN being committed: set listening=true.
                               1773                 :                :                          * localChannelTable entry was created during
                               1774                 :                :                          * PreCommit and should be kept.
                               1775                 :                :                          */
                               1776                 :             55 :                         listeners[i].listening = true;
                               1777                 :             55 :                         removeLocal = false;
                               1778                 :                :                     }
                               1779                 :                :                     else
                               1780                 :                :                     {
                               1781                 :                :                         /*
                               1782                 :                :                          * UNLISTEN being committed: remove pre-allocated
                               1783                 :                :                          * entries from both tables.
                               1784                 :                :                          */
                               1785                 :             37 :                         RemoveListenerFromChannel(&entry, listeners, i);
                               1786                 :                :                     }
                               1787                 :                :                 }
                               1788                 :                :                 else
                               1789                 :                :                 {
                               1790                 :                :                     /*
                               1791                 :                :                      * Note: this part is reachable only if the transaction
                               1792                 :                :                      * aborts after PreCommit_Notify() has made some
                               1793                 :                :                      * pendingListenActions entries, so it's pretty hard to
                               1794                 :                :                      * test.
                               1795                 :                :                      */
   59 tgl@sss.pgh.pa.us        1796         [ #  # ]:UNC           0 :                     if (!listeners[i].listening)
                               1797                 :                :                     {
                               1798                 :                :                         /*
                               1799                 :                :                          * Staged LISTEN (or LISTEN+UNLISTEN) being aborted,
                               1800                 :                :                          * and we weren't listening before, so remove
                               1801                 :                :                          * pre-allocated entries from both tables.
                               1802                 :                :                          */
                               1803                 :              0 :                         RemoveListenerFromChannel(&entry, listeners, i);
                               1804                 :                :                     }
                               1805                 :                :                     else
                               1806                 :                :                     {
                               1807                 :                :                         /*
                               1808                 :                :                          * We're aborting, but the previous state was that
                               1809                 :                :                          * we're listening, so keep localChannelTable entry.
                               1810                 :                :                          */
                               1811                 :              0 :                         removeLocal = false;
                               1812                 :                :                     }
                               1813                 :                :                 }
   59 tgl@sss.pgh.pa.us        1814                 :GNC          92 :                 break;          /* there shouldn't be another match */
                               1815                 :                :             }
                               1816                 :                : 
                               1817                 :                :             /* We might have already released the entry by removing it */
                               1818         [ +  + ]:             92 :             if (entry != NULL)
                               1819                 :             66 :                 dshash_release_lock(globalChannelTable, entry);
                               1820                 :                :         }
                               1821                 :                : 
                               1822                 :                :         /*
                               1823                 :                :          * If we're committing a LISTEN action, we should have found a
                               1824                 :                :          * matching ListenerEntry, but otherwise it's okay if we didn't.
                               1825                 :                :          */
                               1826   [ +  -  +  +  :             92 :         if (isCommit && pending->action == PENDING_LISTEN && !foundListener)
                                              -  + ]
   59 tgl@sss.pgh.pa.us        1827         [ #  # ]:UNC           0 :             elog(PANIC, "could not find listener entry for channel \"%s\" backend %d",
                               1828                 :                :                  pending->channel, MyProcNumber);
                               1829                 :                : 
                               1830                 :                :         /*
                               1831                 :                :          * If we did not find a globalChannelTable entry for our backend, or
                               1832                 :                :          * if we are unlistening, remove any localChannelTable entry that may
                               1833                 :                :          * exist.  (Note in particular that this cleans up if we created a
                               1834                 :                :          * localChannelTable entry and then failed while trying to create a
                               1835                 :                :          * globalChannelTable entry.)
                               1836                 :                :          */
   59 tgl@sss.pgh.pa.us        1837   [ +  +  +  - ]:GNC          92 :         if (removeLocal && localChannelTable != NULL)
                               1838                 :             37 :             (void) hash_search(localChannelTable, pending->channel,
                               1839                 :                :                                HASH_REMOVE, NULL);
                               1840                 :                :     }
 5871 tgl@sss.pgh.pa.us        1841                 :ECB         (3) : }
                               1842                 :                : 
                               1843                 :                : /*
                               1844                 :                :  * CleanupListenersOnExit --- called from Async_UnlistenOnExit
                               1845                 :                :  *
                               1846                 :                :  *      Remove this backend from all channels in the shared global table.
                               1847                 :                :  */
                               1848                 :                : static void
   59 tgl@sss.pgh.pa.us        1849                 :GNC          17 : CleanupListenersOnExit(void)
                               1850                 :                : {
                               1851                 :                :     dshash_seq_status status;
                               1852                 :                :     GlobalChannelEntry *entry;
                               1853                 :                : 
 5871 tgl@sss.pgh.pa.us        1854         [ -  + ]:CBC          17 :     if (Trace_notify)
   59 tgl@sss.pgh.pa.us        1855         [ #  # ]:UNC           0 :         elog(DEBUG1, "CleanupListenersOnExit(%d)", MyProcPid);
                               1856                 :                : 
                               1857                 :                :     /* Clear our local cache (not really necessary, but be consistent) */
   59 tgl@sss.pgh.pa.us        1858         [ +  - ]:GNC          17 :     if (localChannelTable != NULL)
                               1859                 :                :     {
                               1860                 :             17 :         hash_destroy(localChannelTable);
                               1861                 :             17 :         localChannelTable = NULL;
                               1862                 :                :     }
                               1863                 :                : 
                               1864                 :                :     /* Now remove our entries from the shared globalChannelTable */
                               1865         [ -  + ]:             17 :     if (globalChannelTable == NULL)
   59 tgl@sss.pgh.pa.us        1866                 :UNC           0 :         return;
                               1867                 :                : 
   59 tgl@sss.pgh.pa.us        1868                 :GNC          17 :     dshash_seq_init(&status, globalChannelTable, true);
                               1869         [ +  + ]:             25 :     while ((entry = dshash_seq_next(&status)) != NULL)
                               1870                 :                :     {
                               1871                 :                :         ListenerEntry *listeners;
                               1872                 :                : 
                               1873         [ -  + ]:              8 :         if (entry->key.dboid != MyDatabaseId)
   59 tgl@sss.pgh.pa.us        1874                 :UNC           0 :             continue;           /* not relevant */
                               1875                 :                : 
                               1876                 :                :         listeners = (ListenerEntry *)
   59 tgl@sss.pgh.pa.us        1877                 :GNC           8 :             dsa_get_address(globalChannelDSA, entry->listenersArray);
                               1878                 :                : 
                               1879         [ +  - ]:              8 :         for (int i = 0; i < entry->numListeners; i++)
                               1880                 :                :         {
                               1881         [ +  - ]:              8 :             if (listeners[i].procNo == MyProcNumber)
                               1882                 :                :             {
                               1883                 :              8 :                 entry->numListeners--;
                               1884         [ -  + ]:              8 :                 if (i < entry->numListeners)
   59 tgl@sss.pgh.pa.us        1885                 :UNC           0 :                     memmove(&listeners[i], &listeners[i + 1],
                               1886                 :              0 :                             sizeof(ListenerEntry) * (entry->numListeners - i));
                               1887                 :                : 
   59 tgl@sss.pgh.pa.us        1888         [ +  - ]:GNC           8 :                 if (entry->numListeners == 0)
                               1889                 :                :                 {
                               1890                 :              8 :                     dsa_free(globalChannelDSA, entry->listenersArray);
                               1891                 :              8 :                     dshash_delete_current(&status);
                               1892                 :                :                 }
                               1893                 :              8 :                 break;
                               1894                 :                :             }
                               1895                 :                :         }
                               1896                 :                :     }
                               1897                 :             17 :     dshash_seq_term(&status);
 5871 tgl@sss.pgh.pa.us        1898                 :ECB        (30) : }
                               1899                 :                : 
                               1900                 :                : /*
                               1901                 :                :  * Test whether we are actively listening on the given channel name.
                               1902                 :                :  *
                               1903                 :                :  * Note: this function is executed for every notification found in the queue.
                               1904                 :                :  */
                               1905                 :                : static bool
 5871 tgl@sss.pgh.pa.us        1906                 :CBC          55 : IsListeningOn(const char *channel)
                               1907                 :                : {
   59 tgl@sss.pgh.pa.us        1908         [ -  + ]:GNC          55 :     if (localChannelTable == NULL)
   59 tgl@sss.pgh.pa.us        1909                 :UNC           0 :         return false;
                               1910                 :                : 
   59 tgl@sss.pgh.pa.us        1911                 :GNC          55 :     return (hash_search(localChannelTable, channel, HASH_FIND, NULL) != NULL);
                               1912                 :                : }
                               1913                 :                : 
                               1914                 :                : /*
                               1915                 :                :  * Remove our entry from the listeners array when we are no longer listening
                               1916                 :                :  * on any channel.  NB: must not fail if we're already not listening.
                               1917                 :                :  */
                               1918                 :                : static void
 5871 tgl@sss.pgh.pa.us        1919                 :CBC          40 : asyncQueueUnregister(void)
                               1920                 :                : {
   59 tgl@sss.pgh.pa.us        1921   [ +  +  -  + ]:GNC          40 :     Assert(LocalChannelTableIsEmpty()); /* else caller error */
                               1922                 :                : 
 4673 bruce@momjian.us         1923         [ +  + ]:CBC          40 :     if (!amRegisteredListener)  /* nothing to do */
 4778 tgl@sss.pgh.pa.us        1924                 :             11 :         return;
                               1925                 :                : 
                               1926                 :                :     /*
                               1927                 :                :      * Need exclusive lock here to manipulate list links.
                               1928                 :                :      */
 2130                          1929                 :             29 :     LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
                               1930                 :                :     /* Mark our entry as invalid */
  742 heikki.linnakangas@i     1931                 :             29 :     QUEUE_BACKEND_PID(MyProcNumber) = InvalidPid;
                               1932                 :             29 :     QUEUE_BACKEND_DBOID(MyProcNumber) = InvalidOid;
   59 tgl@sss.pgh.pa.us        1933                 :GNC          29 :     QUEUE_BACKEND_WAKEUP_PENDING(MyProcNumber) = false;
                               1934                 :             29 :     QUEUE_BACKEND_IS_ADVANCING(MyProcNumber) = false;
                               1935                 :                :     /* and remove it from the list */
  742 heikki.linnakangas@i     1936         [ +  + ]:CBC          29 :     if (QUEUE_FIRST_LISTENER == MyProcNumber)
                               1937                 :             24 :         QUEUE_FIRST_LISTENER = QUEUE_NEXT_LISTENER(MyProcNumber);
                               1938                 :                :     else
                               1939                 :                :     {
                               1940         [ +  - ]:              8 :         for (ProcNumber i = QUEUE_FIRST_LISTENER; i != INVALID_PROC_NUMBER; i = QUEUE_NEXT_LISTENER(i))
                               1941                 :                :         {
                               1942         [ +  + ]:              8 :             if (QUEUE_NEXT_LISTENER(i) == MyProcNumber)
                               1943                 :                :             {
                               1944                 :              5 :                 QUEUE_NEXT_LISTENER(i) = QUEUE_NEXT_LISTENER(MyProcNumber);
 2378 tgl@sss.pgh.pa.us        1945                 :              5 :                 break;
                               1946                 :                :             }
                               1947                 :                :         }
                               1948                 :                :     }
  742 heikki.linnakangas@i     1949                 :             29 :     QUEUE_NEXT_LISTENER(MyProcNumber) = INVALID_PROC_NUMBER;
 2130 tgl@sss.pgh.pa.us        1950                 :             29 :     LWLockRelease(NotifyQueueLock);
                               1951                 :                : 
                               1952                 :                :     /* mark ourselves as no longer listed in the global array */
 4778                          1953                 :             29 :     amRegisteredListener = false;
                               1954                 :                : }
                               1955                 :                : 
                               1956                 :                : /*
                               1957                 :                :  * Test whether there is room to insert more notification messages.
                               1958                 :                :  *
                               1959                 :                :  * Caller must hold at least shared NotifyQueueLock.
                               1960                 :                :  */
                               1961                 :                : static bool
 5871                          1962                 :             90 : asyncQueueIsFull(void)
                               1963                 :                : {
  600 michael@paquier.xyz      1964                 :             90 :     int64       headPage = QUEUE_POS_PAGE(QUEUE_HEAD);
                               1965                 :             90 :     int64       tailPage = QUEUE_POS_PAGE(QUEUE_TAIL);
                               1966                 :             90 :     int64       occupied = headPage - tailPage;
                               1967                 :                : 
  837 akorotkov@postgresql     1968                 :             90 :     return occupied >= max_notify_queue_pages;
                               1969                 :                : }
                               1970                 :                : 
                               1971                 :                : /*
                               1972                 :                :  * Advance the QueuePosition to the next entry, assuming that the current
                               1973                 :                :  * entry is of length entryLength.  If we jump to a new page the function
                               1974                 :                :  * returns true, else false.
                               1975                 :                :  */
                               1976                 :                : static bool
 4066 tgl@sss.pgh.pa.us        1977                 :           2413 : asyncQueueAdvance(volatile QueuePosition *position, int entryLength)
                               1978                 :                : {
  837 akorotkov@postgresql     1979                 :           2413 :     int64       pageno = QUEUE_POS_PAGE(*position);
 5861 bruce@momjian.us         1980                 :           2413 :     int         offset = QUEUE_POS_OFFSET(*position);
                               1981                 :           2413 :     bool        pageJump = false;
                               1982                 :                : 
                               1983                 :                :     /*
                               1984                 :                :      * Move to the next writing position: First jump over what we have just
                               1985                 :                :      * written or read.
                               1986                 :                :      */
 5871 tgl@sss.pgh.pa.us        1987                 :           2413 :     offset += entryLength;
                               1988         [ -  + ]:           2413 :     Assert(offset <= QUEUE_PAGESIZE);
                               1989                 :                : 
                               1990                 :                :     /*
                               1991                 :                :      * In a second step check if another entry can possibly be written to the
                               1992                 :                :      * page. If so, stay here, we have reached the next position. If not, then
                               1993                 :                :      * we need to move on to the next page.
                               1994                 :                :      */
                               1995         [ +  + ]:           2413 :     if (offset + QUEUEALIGN(AsyncQueueEntryEmptySize) > QUEUE_PAGESIZE)
                               1996                 :                :     {
                               1997                 :             38 :         pageno++;
                               1998                 :             38 :         offset = 0;
                               1999                 :             38 :         pageJump = true;
                               2000                 :                :     }
                               2001                 :                : 
                               2002                 :           2413 :     SET_QUEUE_POS(*position, pageno, offset);
                               2003                 :           2413 :     return pageJump;
                               2004                 :                : }
                               2005                 :                : 
                               2006                 :                : /*
                               2007                 :                :  * Fill the AsyncQueueEntry at *qe with an outbound notification message.
                               2008                 :                :  */
                               2009                 :                : static void
                               2010                 :           1132 : asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe)
                               2011                 :                : {
 2404                          2012                 :           1132 :     size_t      channellen = n->channel_len;
                               2013                 :           1132 :     size_t      payloadlen = n->payload_len;
                               2014                 :                :     int         entryLength;
                               2015                 :                : 
 5871                          2016         [ -  + ]:           1132 :     Assert(channellen < NAMEDATALEN);
                               2017         [ -  + ]:           1132 :     Assert(payloadlen < NOTIFY_PAYLOAD_MAX_LENGTH);
                               2018                 :                : 
                               2019                 :                :     /* The terminators are already included in AsyncQueueEntryEmptySize */
                               2020                 :           1132 :     entryLength = AsyncQueueEntryEmptySize + payloadlen + channellen;
                               2021                 :           1132 :     entryLength = QUEUEALIGN(entryLength);
                               2022                 :           1132 :     qe->length = entryLength;
                               2023                 :           1132 :     qe->dboid = MyDatabaseId;
                               2024                 :           1132 :     qe->xid = GetCurrentTransactionId();
                               2025                 :           1132 :     qe->srcPid = MyProcPid;
 2404                          2026                 :           1132 :     memcpy(qe->data, n->data, channellen + payloadlen + 2);
 5871                          2027                 :           1132 : }
                               2028                 :                : 
                               2029                 :                : /*
                               2030                 :                :  * Add pending notifications to the queue.
                               2031                 :                :  *
                               2032                 :                :  * We go page by page here, i.e. we stop once we have to go to a new page but
                               2033                 :                :  * we will be called again and then fill that next page. If an entry does not
                               2034                 :                :  * fit into the current page, we write a dummy entry with an InvalidOid as the
                               2035                 :                :  * database OID in order to fill the page. So every page is always used up to
                               2036                 :                :  * the last byte which simplifies reading the page later.
                               2037                 :                :  *
                               2038                 :                :  * We are passed the list cell (in pendingNotifies->events) containing the next
                               2039                 :                :  * notification to write and return the first still-unwritten cell back.
                               2040                 :                :  * Eventually we will return NULL indicating all is done.
                               2041                 :                :  *
                               2042                 :                :  * We are holding NotifyQueueLock already from the caller and grab
                               2043                 :                :  * page specific SLRU bank lock locally in this function.
                               2044                 :                :  */
                               2045                 :                : static ListCell *
                               2046                 :             90 : asyncQueueAddEntries(ListCell *nextNotify)
                               2047                 :                : {
                               2048                 :                :     AsyncQueueEntry qe;
                               2049                 :                :     QueuePosition queue_head;
                               2050                 :                :     int64       pageno;
                               2051                 :                :     int         offset;
                               2052                 :                :     int         slotno;
                               2053                 :                :     LWLock     *prevlock;
                               2054                 :                : 
                               2055                 :                :     /*
                               2056                 :                :      * We work with a local copy of QUEUE_HEAD, which we write back to shared
                               2057                 :                :      * memory upon exiting.  The reason for this is that if we have to advance
                               2058                 :                :      * to a new page, SimpleLruZeroPage might fail (out of disk space, for
                               2059                 :                :      * instance), and we must not advance QUEUE_HEAD if it does.  (Otherwise,
                               2060                 :                :      * subsequent insertions would try to put entries into a page that slru.c
                               2061                 :                :      * thinks doesn't exist yet.)  So, use a local position variable.  Note
                               2062                 :                :      * that if we do fail, any already-inserted queue entries are forgotten;
                               2063                 :                :      * this is okay, since they'd be useless anyway after our transaction
                               2064                 :                :      * rolls back.
                               2065                 :                :      */
 5007                          2066                 :             90 :     queue_head = QUEUE_HEAD;
                               2067                 :                : 
                               2068                 :                :     /*
                               2069                 :                :      * If this is the first write since the postmaster started, we need to
                               2070                 :                :      * initialize the first page of the async SLRU.  Otherwise, the current
                               2071                 :                :      * page should be initialized already, so just fetch it.
                               2072                 :                :      */
                               2073                 :             90 :     pageno = QUEUE_POS_PAGE(queue_head);
  746 alvherre@alvh.no-ip.     2074                 :             90 :     prevlock = SimpleLruGetBankLock(NotifyCtl, pageno);
                               2075                 :                : 
                               2076                 :                :     /* We hold both NotifyQueueLock and SLRU bank lock during this operation */
                               2077                 :             90 :     LWLockAcquire(prevlock, LW_EXCLUSIVE);
                               2078                 :                : 
 2132 tgl@sss.pgh.pa.us        2079   [ +  +  +  + ]:             90 :     if (QUEUE_POS_IS_ZERO(queue_head))
 2130                          2080                 :              7 :         slotno = SimpleLruZeroPage(NotifyCtl, pageno);
                               2081                 :                :     else
    2 heikki.linnakangas@i     2082                 :GNC          83 :         slotno = SimpleLruReadPage(NotifyCtl, pageno, true, &queue_head);
                               2083                 :                : 
                               2084                 :                :     /* Note we mark the page dirty before writing in it */
 2130 tgl@sss.pgh.pa.us        2085                 :CBC          90 :     NotifyCtl->shared->page_dirty[slotno] = true;
                               2086                 :                : 
 5871                          2087         [ +  + ]:           1187 :     while (nextNotify != NULL)
                               2088                 :                :     {
 5861 bruce@momjian.us         2089                 :           1132 :         Notification *n = (Notification *) lfirst(nextNotify);
                               2090                 :                : 
                               2091                 :                :         /* Construct a valid queue entry in local variable qe */
 5871 tgl@sss.pgh.pa.us        2092                 :           1132 :         asyncQueueNotificationToEntry(n, &qe);
                               2093                 :                : 
 5007                          2094                 :           1132 :         offset = QUEUE_POS_OFFSET(queue_head);
                               2095                 :                : 
                               2096                 :                :         /* Check whether the entry really fits on the current page */
 5871                          2097         [ +  + ]:           1132 :         if (offset + qe.length <= QUEUE_PAGESIZE)
                               2098                 :                :         {
                               2099                 :                :             /* OK, so advance nextNotify past this item */
 2404                          2100                 :           1100 :             nextNotify = lnext(pendingNotifies->events, nextNotify);
                               2101                 :                :         }
                               2102                 :                :         else
                               2103                 :                :         {
                               2104                 :                :             /*
                               2105                 :                :              * Write a dummy entry to fill up the page. Actually readers will
                               2106                 :                :              * only check dboid and since it won't match any reader's database
                               2107                 :                :              * OID, they will ignore this entry and move on.
                               2108                 :                :              */
 5871                          2109                 :             32 :             qe.length = QUEUE_PAGESIZE - offset;
                               2110                 :             32 :             qe.dboid = InvalidOid;
  123 heikki.linnakangas@i     2111                 :             32 :             qe.xid = InvalidTransactionId;
 5861 bruce@momjian.us         2112                 :             32 :             qe.data[0] = '\0';  /* empty channel */
                               2113                 :             32 :             qe.data[1] = '\0';  /* empty payload */
                               2114                 :                :         }
                               2115                 :                : 
                               2116                 :                :         /* Now copy qe into the shared buffer page */
 2130 tgl@sss.pgh.pa.us        2117                 :           1132 :         memcpy(NotifyCtl->shared->page_buffer[slotno] + offset,
                               2118                 :                :                &qe,
 5871                          2119                 :           1132 :                qe.length);
                               2120                 :                : 
                               2121                 :                :         /* Advance queue_head appropriately, and detect if page is full */
 5007                          2122         [ +  + ]:           1132 :         if (asyncQueueAdvance(&(queue_head), qe.length))
                               2123                 :                :         {
                               2124                 :                :             LWLock     *lock;
                               2125                 :                : 
  746 alvherre@alvh.no-ip.     2126                 :             35 :             pageno = QUEUE_POS_PAGE(queue_head);
                               2127                 :             35 :             lock = SimpleLruGetBankLock(NotifyCtl, pageno);
                               2128         [ -  + ]:             35 :             if (lock != prevlock)
                               2129                 :                :             {
  746 alvherre@alvh.no-ip.     2130                 :UBC           0 :                 LWLockRelease(prevlock);
                               2131                 :              0 :                 LWLockAcquire(lock, LW_EXCLUSIVE);
                               2132                 :              0 :                 prevlock = lock;
                               2133                 :                :             }
                               2134                 :                : 
                               2135                 :                :             /*
                               2136                 :                :              * Page is full, so we're done here, but first fill the next page
                               2137                 :                :              * with zeroes.  The reason to do this is to ensure that slru.c's
                               2138                 :                :              * idea of the head page is always the same as ours, which avoids
                               2139                 :                :              * boundary problems in SimpleLruTruncate.  The test in
                               2140                 :                :              * asyncQueueIsFull() ensured that there is room to create this
                               2141                 :                :              * page without overrunning the queue.
                               2142                 :                :              */
 2130 tgl@sss.pgh.pa.us        2143                 :CBC          35 :             slotno = SimpleLruZeroPage(NotifyCtl, QUEUE_POS_PAGE(queue_head));
                               2144                 :                : 
                               2145                 :                :             /*
                               2146                 :                :              * If the new page address is a multiple of QUEUE_CLEANUP_DELAY,
                               2147                 :                :              * set flag to remember that we should try to advance the tail
                               2148                 :                :              * pointer (we don't want to actually do that right here).
                               2149                 :                :              */
 2366                          2150         [ +  + ]:             35 :             if (QUEUE_POS_PAGE(queue_head) % QUEUE_CLEANUP_DELAY == 0)
 1643                          2151                 :              8 :                 tryAdvanceTail = true;
                               2152                 :                : 
                               2153                 :                :             /* And exit the loop */
 9042                          2154                 :             35 :             break;
                               2155                 :                :         }
                               2156                 :                :     }
                               2157                 :                : 
                               2158                 :                :     /* Success, so update the global QUEUE_HEAD */
 5007                          2159                 :             90 :     QUEUE_HEAD = queue_head;
                               2160                 :                : 
  746 alvherre@alvh.no-ip.     2161                 :             90 :     LWLockRelease(prevlock);
                               2162                 :                : 
 5871 tgl@sss.pgh.pa.us        2163                 :             90 :     return nextNotify;
                               2164                 :                : }
                               2165                 :                : 
                               2166                 :                : /*
                               2167                 :                :  * SQL function to return the fraction of the notification queue currently
                               2168                 :                :  * occupied.
                               2169                 :                :  */
                               2170                 :                : Datum
 3894 rhaas@postgresql.org     2171                 :              5 : pg_notification_queue_usage(PG_FUNCTION_ARGS)
                               2172                 :                : {
                               2173                 :                :     double      usage;
                               2174                 :                : 
                               2175                 :                :     /* Advance the queue tail so we don't report a too-large result */
 2303 tgl@sss.pgh.pa.us        2176                 :              5 :     asyncQueueAdvanceTail();
                               2177                 :                : 
 2130                          2178                 :              5 :     LWLockAcquire(NotifyQueueLock, LW_SHARED);
 3894 rhaas@postgresql.org     2179                 :              5 :     usage = asyncQueueUsage();
 2130 tgl@sss.pgh.pa.us        2180                 :              5 :     LWLockRelease(NotifyQueueLock);
                               2181                 :                : 
 3894 rhaas@postgresql.org     2182                 :              5 :     PG_RETURN_FLOAT8(usage);
                               2183                 :                : }
                               2184                 :                : 
                               2185                 :                : /*
                               2186                 :                :  * Return the fraction of the queue that is currently occupied.
                               2187                 :                :  *
                               2188                 :                :  * The caller must hold NotifyQueueLock in (at least) shared mode.
                               2189                 :                :  *
                               2190                 :                :  * Note: we measure the distance to the logical tail page, not the physical
                               2191                 :                :  * tail page.  In some sense that's wrong, but the relative position of the
                               2192                 :                :  * physical tail is affected by details such as SLRU segment boundaries,
                               2193                 :                :  * so that a result based on that is unpleasantly unstable.
                               2194                 :                :  */
                               2195                 :                : static double
                               2196                 :             95 : asyncQueueUsage(void)
                               2197                 :                : {
  600 michael@paquier.xyz      2198                 :             95 :     int64       headPage = QUEUE_POS_PAGE(QUEUE_HEAD);
                               2199                 :             95 :     int64       tailPage = QUEUE_POS_PAGE(QUEUE_TAIL);
                               2200                 :             95 :     int64       occupied = headPage - tailPage;
                               2201                 :                : 
 5871 tgl@sss.pgh.pa.us        2202         [ +  + ]:             95 :     if (occupied == 0)
 3894 rhaas@postgresql.org     2203                 :             56 :         return (double) 0;      /* fast exit for common case */
                               2204                 :                : 
  837 akorotkov@postgresql     2205                 :             39 :     return (double) occupied / (double) max_notify_queue_pages;
                               2206                 :                : }
                               2207                 :                : 
                               2208                 :                : /*
                               2209                 :                :  * Check whether the queue is at least half full, and emit a warning if so.
                               2210                 :                :  *
                               2211                 :                :  * This is unlikely given the size of the queue, but possible.
                               2212                 :                :  * The warnings show up at most once every QUEUE_FULL_WARN_INTERVAL.
                               2213                 :                :  *
                               2214                 :                :  * Caller must hold exclusive NotifyQueueLock.
                               2215                 :                :  */
                               2216                 :                : static void
 3894 rhaas@postgresql.org     2217                 :             90 : asyncQueueFillWarning(void)
                               2218                 :                : {
                               2219                 :                :     double      fillDegree;
                               2220                 :                :     TimestampTz t;
                               2221                 :                : 
                               2222                 :             90 :     fillDegree = asyncQueueUsage();
 5871 tgl@sss.pgh.pa.us        2223         [ +  - ]:             90 :     if (fillDegree < 0.5)
                               2224                 :             90 :         return;
                               2225                 :                : 
 5871 tgl@sss.pgh.pa.us        2226                 :UBC           0 :     t = GetCurrentTimestamp();
                               2227                 :                : 
                               2228         [ #  # ]:              0 :     if (TimestampDifferenceExceeds(asyncQueueControl->lastQueueFillWarn,
                               2229                 :                :                                    t, QUEUE_FULL_WARN_INTERVAL))
                               2230                 :                :     {
 5861 bruce@momjian.us         2231                 :              0 :         QueuePosition min = QUEUE_HEAD;
                               2232                 :              0 :         int32       minPid = InvalidPid;
                               2233                 :                : 
  742 heikki.linnakangas@i     2234         [ #  # ]:              0 :         for (ProcNumber i = QUEUE_FIRST_LISTENER; i != INVALID_PROC_NUMBER; i = QUEUE_NEXT_LISTENER(i))
                               2235                 :                :         {
 2378 tgl@sss.pgh.pa.us        2236         [ #  # ]:              0 :             Assert(QUEUE_BACKEND_PID(i) != InvalidPid);
                               2237   [ #  #  #  #  :              0 :             min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i));
                                              #  # ]
                               2238   [ #  #  #  # ]:              0 :             if (QUEUE_POS_EQUAL(min, QUEUE_BACKEND_POS(i)))
                               2239                 :              0 :                 minPid = QUEUE_BACKEND_PID(i);
                               2240                 :                :         }
                               2241                 :                : 
 5871                          2242   [ #  #  #  #  :              0 :         ereport(WARNING,
                                              #  # ]
                               2243                 :                :                 (errmsg("NOTIFY queue is %.0f%% full", fillDegree * 100),
                               2244                 :                :                  (minPid != InvalidPid ?
                               2245                 :                :                   errdetail("The server process with PID %d is among those with the oldest transactions.", minPid)
                               2246                 :                :                   : 0),
                               2247                 :                :                  (minPid != InvalidPid ?
                               2248                 :                :                   errhint("The NOTIFY queue cannot be emptied until that process ends its current transaction.")
                               2249                 :                :                   : 0)));
                               2250                 :                : 
                               2251                 :              0 :         asyncQueueControl->lastQueueFillWarn = t;
                               2252                 :                :     }
                               2253                 :                : }
                               2254                 :                : 
                               2255                 :                : /*
                               2256                 :                :  * Send signals to listening backends.
                               2257                 :                :  *
                               2258                 :                :  * Normally we signal only backends that are interested in the notifies that
                               2259                 :                :  * we just sent.  However, that will leave idle listeners falling further and
                               2260                 :                :  * further behind.  Waken them anyway if they're far enough behind, so they'll
                               2261                 :                :  * advance their queue position pointers, allowing the global tail to advance.
                               2262                 :                :  *
                               2263                 :                :  * Since we know the ProcNumber and the Pid the signaling is quite cheap.
                               2264                 :                :  *
                               2265                 :                :  * This is called during CommitTransaction(), so it's important for it
                               2266                 :                :  * to have very low probability of failure.
                               2267                 :                :  */
                               2268                 :                : static void
 5871 tgl@sss.pgh.pa.us        2269                 :CBC          55 : SignalBackends(void)
                               2270                 :                : {
                               2271                 :                :     int         count;
                               2272                 :                : 
                               2273                 :                :     /* Can't get here without PreCommit_Notify having made the global table */
   59 tgl@sss.pgh.pa.us        2274         [ -  + ]:GNC          55 :     Assert(globalChannelTable != NULL);
                               2275                 :                : 
                               2276                 :                :     /* It should have set up these arrays, too */
                               2277   [ +  -  -  + ]:             55 :     Assert(signalPids != NULL && signalProcnos != NULL);
                               2278                 :                : 
                               2279                 :                :     /*
                               2280                 :                :      * Identify backends that we need to signal.  We don't want to send
                               2281                 :                :      * signals while holding the NotifyQueueLock, so this part just builds a
                               2282                 :                :      * list of target PIDs in signalPids[] and signalProcnos[].
                               2283                 :                :      */
 5871 tgl@sss.pgh.pa.us        2284                 :CBC          55 :     count = 0;
                               2285                 :                : 
 2130                          2286                 :             55 :     LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
                               2287                 :                : 
                               2288                 :                :     /* Scan each channel name that we notified in this transaction */
   59 tgl@sss.pgh.pa.us        2289   [ +  -  +  +  :GNC         168 :     foreach_ptr(char, channel, pendingNotifies->uniqueChannelNames)
                                              +  + ]
                               2290                 :                :     {
                               2291                 :                :         GlobalChannelKey key;
                               2292                 :                :         GlobalChannelEntry *entry;
                               2293                 :                :         ListenerEntry *listeners;
                               2294                 :                : 
                               2295                 :             58 :         GlobalChannelKeyInit(&key, MyDatabaseId, channel);
                               2296                 :             58 :         entry = dshash_find(globalChannelTable, &key, false);
                               2297         [ +  + ]:             58 :         if (entry == NULL)
                               2298                 :             28 :             continue;           /* nobody is listening */
                               2299                 :                : 
                               2300                 :             30 :         listeners = (ListenerEntry *) dsa_get_address(globalChannelDSA,
                               2301                 :                :                                                       entry->listenersArray);
                               2302                 :                : 
                               2303                 :                :         /* Identify listeners that now need waking, add them to arrays */
                               2304         [ +  + ]:             65 :         for (int j = 0; j < entry->numListeners; j++)
                               2305                 :                :         {
                               2306                 :                :             ProcNumber  i;
                               2307                 :                :             int32       pid;
                               2308                 :                :             QueuePosition pos;
                               2309                 :                : 
                               2310         [ -  + ]:             35 :             if (!listeners[j].listening)
                               2311                 :              3 :                 continue;       /* ignore not-yet-committed listeners */
                               2312                 :                : 
                               2313                 :             35 :             i = listeners[j].procNo;
                               2314                 :                : 
                               2315         [ +  + ]:             35 :             if (QUEUE_BACKEND_WAKEUP_PENDING(i))
                               2316                 :              3 :                 continue;       /* already signaled, no need to repeat */
                               2317                 :                : 
                               2318                 :             32 :             pid = QUEUE_BACKEND_PID(i);
                               2319                 :             32 :             pos = QUEUE_BACKEND_POS(i);
                               2320                 :                : 
                               2321   [ +  +  -  + ]:             32 :             if (QUEUE_POS_EQUAL(pos, QUEUE_HEAD))
   59 tgl@sss.pgh.pa.us        2322                 :UNC           0 :                 continue;       /* it's fully caught up already */
                               2323                 :                : 
   59 tgl@sss.pgh.pa.us        2324         [ -  + ]:GNC          32 :             Assert(pid != InvalidPid);
                               2325                 :                : 
                               2326                 :             32 :             QUEUE_BACKEND_WAKEUP_PENDING(i) = true;
                               2327                 :             32 :             signalPids[count] = pid;
                               2328                 :             32 :             signalProcnos[count] = i;
                               2329                 :             32 :             count++;
                               2330                 :                :         }
                               2331                 :                : 
                               2332                 :             30 :         dshash_release_lock(globalChannelTable, entry);
                               2333                 :                :     }
                               2334                 :                : 
                               2335                 :                :     /*
                               2336                 :                :      * Scan all listeners.  Any that are not already pending wakeup must not
                               2337                 :                :      * be interested in our notifications (else we'd have set their wakeup
                               2338                 :                :      * flags above).  Check to see if we can directly advance their queue
                               2339                 :                :      * pointers to save a wakeup.  Otherwise, if they are far behind, wake
                               2340                 :                :      * them anyway so they will catch up.
                               2341                 :                :      */
  742 heikki.linnakangas@i     2342         [ +  + ]:CBC         108 :     for (ProcNumber i = QUEUE_FIRST_LISTENER; i != INVALID_PROC_NUMBER; i = QUEUE_NEXT_LISTENER(i))
                               2343                 :                :     {
                               2344                 :                :         int32       pid;
                               2345                 :                :         QueuePosition pos;
                               2346                 :                : 
   59 tgl@sss.pgh.pa.us        2347         [ +  + ]:GNC          53 :         if (QUEUE_BACKEND_WAKEUP_PENDING(i))
                               2348                 :             32 :             continue;
                               2349                 :                : 
                               2350                 :                :         /* If it's currently advancing, we should not touch it */
                               2351         [ -  + ]:             21 :         if (QUEUE_BACKEND_IS_ADVANCING(i))
   59 tgl@sss.pgh.pa.us        2352                 :UNC           0 :             continue;
                               2353                 :                : 
   59 tgl@sss.pgh.pa.us        2354                 :GNC          21 :         pid = QUEUE_BACKEND_PID(i);
 2366 tgl@sss.pgh.pa.us        2355                 :CBC          21 :         pos = QUEUE_BACKEND_POS(i);
                               2356                 :                : 
                               2357                 :                :         /*
                               2358                 :                :          * We can directly advance the other backend's queue pointer if it's
                               2359                 :                :          * not currently advancing (else there are race conditions), and its
                               2360                 :                :          * current pointer is not behind queueHeadBeforeWrite (else we'd make
                               2361                 :                :          * it miss some older messages), and we'd not be moving the pointer
                               2362                 :                :          * backward.
                               2363                 :                :          */
   59 tgl@sss.pgh.pa.us        2364   [ +  -  +  -  :GNC          42 :         if (!QUEUE_POS_PRECEDES(pos, queueHeadBeforeWrite) &&
                                        +  -  +  + ]
                               2365   [ +  -  +  - ]:             31 :             QUEUE_POS_PRECEDES(pos, queueHeadAfterWrite))
                               2366                 :                :         {
                               2367                 :                :             /* We can directly advance its pointer past what we wrote */
                               2368                 :             21 :             QUEUE_BACKEND_POS(i) = queueHeadAfterWrite;
                               2369                 :                :         }
   59 tgl@sss.pgh.pa.us        2370         [ #  # ]:UNC           0 :         else if (asyncQueuePageDiff(QUEUE_POS_PAGE(QUEUE_HEAD),
                               2371                 :                :                                     QUEUE_POS_PAGE(pos)) >= QUEUE_CLEANUP_DELAY)
                               2372                 :                :         {
                               2373                 :                :             /* It's idle and far behind, so wake it up */
                               2374         [ #  # ]:              0 :             Assert(pid != InvalidPid);
                               2375                 :                : 
                               2376                 :              0 :             QUEUE_BACKEND_WAKEUP_PENDING(i) = true;
                               2377                 :              0 :             signalPids[count] = pid;
                               2378                 :              0 :             signalProcnos[count] = i;
                               2379                 :              0 :             count++;
                               2380                 :                :         }
                               2381                 :                :     }
                               2382                 :                : 
 2130 tgl@sss.pgh.pa.us        2383                 :CBC          55 :     LWLockRelease(NotifyQueueLock);
                               2384                 :                : 
                               2385                 :                :     /* Now send signals */
 2378                          2386         [ +  + ]:             87 :     for (int i = 0; i < count; i++)
                               2387                 :                :     {
   59 tgl@sss.pgh.pa.us        2388                 :GNC          32 :         int32       pid = signalPids[i];
                               2389                 :                : 
                               2390                 :                :         /*
                               2391                 :                :          * If we are signaling our own process, no need to involve the kernel;
                               2392                 :                :          * just set the flag directly.
                               2393                 :                :          */
 1643 tgl@sss.pgh.pa.us        2394         [ +  + ]:CBC          32 :         if (pid == MyProcPid)
                               2395                 :                :         {
                               2396                 :             21 :             notifyInterruptPending = true;
                               2397                 :             21 :             continue;
                               2398                 :                :         }
                               2399                 :                : 
                               2400                 :                :         /*
                               2401                 :                :          * Note: assuming things aren't broken, a signal failure here could
                               2402                 :                :          * only occur if the target backend exited since we released
                               2403                 :                :          * NotifyQueueLock; which is unlikely but certainly possible. So we
                               2404                 :                :          * just log a low-level debug message if it happens.
                               2405                 :                :          */
   59 tgl@sss.pgh.pa.us        2406         [ -  + ]:GNC          11 :         if (SendProcSignal(pid, PROCSIG_NOTIFY_INTERRUPT, signalProcnos[i]) < 0)
 5871 tgl@sss.pgh.pa.us        2407         [ #  # ]:UBC           0 :             elog(DEBUG3, "could not signal backend with PID %d: %m", pid);
                               2408                 :                :     }
10022 tgl@sss.pgh.pa.us        2409                 :CBC          55 : }
                               2410                 :                : 
                               2411                 :                : /*
                               2412                 :                :  * AtAbort_Notify
                               2413                 :                :  *
                               2414                 :                :  *  This is called at transaction abort.
                               2415                 :                :  *
                               2416                 :                :  *  Revert any staged listen/unlisten changes and clean up transaction state.
                               2417                 :                :  *  This only does anything if we abort after PreCommit_Notify has staged
                               2418                 :                :  *  some entries.
                               2419                 :                :  */
                               2420                 :                : void
 9037                          2421                 :          26742 : AtAbort_Notify(void)
                               2422                 :                : {
                               2423                 :                :     /* Revert staged listen/unlisten changes */
   59 tgl@sss.pgh.pa.us        2424                 :GNC       26742 :     ApplyPendingListenActions(false);
                               2425                 :                : 
                               2426                 :                :     /* If we're no longer listening on anything, unregister */
                               2427   [ -  +  -  -  :          26742 :     if (amRegisteredListener && LocalChannelTableIsEmpty())
                                              -  - ]
 4778 tgl@sss.pgh.pa.us        2428                 :UBC           0 :         asyncQueueUnregister();
                               2429                 :                : 
                               2430                 :                :     /* And clean up */
 6577 tgl@sss.pgh.pa.us        2431                 :CBC       26742 :     ClearPendingActionsAndNotifies();
10022                          2432                 :          26742 : }
                               2433                 :                : 
                               2434                 :                : /*
                               2435                 :                :  * AtSubCommit_Notify() --- Take care of subtransaction commit.
                               2436                 :                :  *
                               2437                 :                :  * Reassign all items in the pending lists to the parent transaction.
                               2438                 :                :  */
                               2439                 :                : void
 7927                          2440                 :           6982 : AtSubCommit_Notify(void)
                               2441                 :                : {
 2354 rhaas@postgresql.org     2442                 :           6982 :     int         my_level = GetCurrentTransactionNestLevel();
                               2443                 :                : 
                               2444                 :                :     /* If there are actions at our nesting level, we must reparent them. */
                               2445         [ +  + ]:           6982 :     if (pendingActions != NULL &&
 2354 rhaas@postgresql.org     2446         [ +  - ]:GBC           2 :         pendingActions->nestingLevel >= my_level)
                               2447                 :                :     {
                               2448         [ +  + ]:              2 :         if (pendingActions->upper == NULL ||
                               2449         [ -  + ]:              1 :             pendingActions->upper->nestingLevel < my_level - 1)
                               2450                 :                :         {
                               2451                 :                :             /* nothing to merge; give the whole thing to the parent */
                               2452                 :              1 :             --pendingActions->nestingLevel;
                               2453                 :                :         }
                               2454                 :                :         else
                               2455                 :                :         {
                               2456                 :              1 :             ActionList *childPendingActions = pendingActions;
                               2457                 :                : 
                               2458                 :              1 :             pendingActions = pendingActions->upper;
                               2459                 :                : 
                               2460                 :                :             /*
                               2461                 :                :              * Mustn't try to eliminate duplicates here --- see queue_listen()
                               2462                 :                :              */
                               2463                 :              2 :             pendingActions->actions =
                               2464                 :              1 :                 list_concat(pendingActions->actions,
                               2465                 :              1 :                             childPendingActions->actions);
                               2466                 :              1 :             pfree(childPendingActions);
                               2467                 :                :         }
                               2468                 :                :     }
                               2469                 :                : 
                               2470                 :                :     /* If there are notifies at our nesting level, we must reparent them. */
 2354 rhaas@postgresql.org     2471         [ +  + ]:CBC        6982 :     if (pendingNotifies != NULL &&
                               2472         [ +  + ]:              3 :         pendingNotifies->nestingLevel >= my_level)
                               2473                 :                :     {
                               2474         [ -  + ]:              2 :         Assert(pendingNotifies->nestingLevel == my_level);
                               2475                 :                : 
                               2476         [ +  + ]:              2 :         if (pendingNotifies->upper == NULL ||
                               2477         [ -  + ]:              1 :             pendingNotifies->upper->nestingLevel < my_level - 1)
                               2478                 :                :         {
                               2479                 :                :             /* nothing to merge; give the whole thing to the parent */
 2354 rhaas@postgresql.org     2480                 :GBC           1 :             --pendingNotifies->nestingLevel;
                               2481                 :                :         }
                               2482                 :                :         else
                               2483                 :                :         {
                               2484                 :                :             /*
                               2485                 :                :              * Formerly, we didn't bother to eliminate duplicates here, but
                               2486                 :                :              * now we must, else we fall foul of "Assert(!found)", either here
                               2487                 :                :              * or during a later attempt to build the parent-level hashtable.
                               2488                 :                :              */
 2354 rhaas@postgresql.org     2489                 :CBC           1 :             NotificationList *childPendingNotifies = pendingNotifies;
                               2490                 :                :             ListCell   *l;
                               2491                 :                : 
                               2492                 :              1 :             pendingNotifies = pendingNotifies->upper;
                               2493                 :                :             /* Insert all the subxact's events into parent, except for dups */
                               2494   [ +  -  +  +  :              5 :             foreach(l, childPendingNotifies->events)
                                              +  + ]
                               2495                 :                :             {
                               2496                 :              4 :                 Notification *childn = (Notification *) lfirst(l);
                               2497                 :                : 
                               2498         [ +  + ]:              4 :                 if (!AsyncExistsPendingNotify(childn))
                               2499                 :              2 :                     AddEventToPendingNotifies(childn);
                               2500                 :                :             }
                               2501                 :              1 :             pfree(childPendingNotifies);
                               2502                 :                :         }
                               2503                 :                :     }
 7927 tgl@sss.pgh.pa.us        2504                 :           6982 : }
                               2505                 :                : 
                               2506                 :                : /*
                               2507                 :                :  * AtSubAbort_Notify() --- Take care of subtransaction abort.
                               2508                 :                :  */
                               2509                 :                : void
                               2510                 :           4703 : AtSubAbort_Notify(void)
                               2511                 :                : {
 7860                          2512                 :           4703 :     int         my_level = GetCurrentTransactionNestLevel();
                               2513                 :                : 
                               2514                 :                :     /*
                               2515                 :                :      * All we have to do is pop the stack --- the actions/notifies made in
                               2516                 :                :      * this subxact are no longer interesting, and the space will be freed
                               2517                 :                :      * when CurTransactionContext is recycled. We still have to free the
                               2518                 :                :      * ActionList and NotificationList objects themselves, though, because
                               2519                 :                :      * those are allocated in TopTransactionContext.
                               2520                 :                :      *
                               2521                 :                :      * Note that there might be no entries at all, or no entries for the
                               2522                 :                :      * current subtransaction level, either because none were ever created, or
                               2523                 :                :      * because we reentered this routine due to trouble during subxact abort.
                               2524                 :                :      */
 2354 rhaas@postgresql.org     2525         [ +  + ]:           4704 :     while (pendingActions != NULL &&
 2354 rhaas@postgresql.org     2526         [ +  - ]:GBC           1 :            pendingActions->nestingLevel >= my_level)
                               2527                 :                :     {
                               2528                 :              1 :         ActionList *childPendingActions = pendingActions;
                               2529                 :                : 
                               2530                 :              1 :         pendingActions = pendingActions->upper;
                               2531                 :              1 :         pfree(childPendingActions);
                               2532                 :                :     }
                               2533                 :                : 
 2354 rhaas@postgresql.org     2534         [ +  + ]:CBC        4704 :     while (pendingNotifies != NULL &&
                               2535         [ +  + ]:              2 :            pendingNotifies->nestingLevel >= my_level)
                               2536                 :                :     {
                               2537                 :              1 :         NotificationList *childPendingNotifies = pendingNotifies;
                               2538                 :                : 
                               2539                 :              1 :         pendingNotifies = pendingNotifies->upper;
                               2540                 :              1 :         pfree(childPendingNotifies);
                               2541                 :                :     }
 7927 tgl@sss.pgh.pa.us        2542                 :           4703 : }
                               2543                 :                : 
                               2544                 :                : /*
                               2545                 :                :  * HandleNotifyInterrupt
                               2546                 :                :  *
                               2547                 :                :  *      Signal handler portion of interrupt handling. Let the backend know
                               2548                 :                :  *      that there's a pending notify interrupt. If we're currently reading
                               2549                 :                :  *      from the client, this will interrupt the read and
                               2550                 :                :  *      ProcessClientReadInterrupt() will call ProcessNotifyInterrupt().
                               2551                 :                :  */
                               2552                 :                : void
 6071                          2553                 :             11 : HandleNotifyInterrupt(void)
                               2554                 :                : {
                               2555                 :                :     /*
                               2556                 :                :      * Note: this is called by a SIGNAL HANDLER. You must be very wary what
                               2557                 :                :      * you do here.
                               2558                 :                :      */
                               2559                 :                : 
                               2560                 :                :     /* signal that work needs to be done */
 4058 andres@anarazel.de       2561                 :             11 :     notifyInterruptPending = true;
                               2562                 :                : 
                               2563                 :                :     /* make sure the event is processed in due course */
                               2564                 :             11 :     SetLatch(MyLatch);
10022 tgl@sss.pgh.pa.us        2565                 :             11 : }
                               2566                 :                : 
                               2567                 :                : /*
                               2568                 :                :  * ProcessNotifyInterrupt
                               2569                 :                :  *
                               2570                 :                :  *      This is called if we see notifyInterruptPending set, just before
                               2571                 :                :  *      transmitting ReadyForQuery at the end of a frontend command, and
                               2572                 :                :  *      also if a notify signal occurs while reading from the frontend.
                               2573                 :                :  *      HandleNotifyInterrupt() will cause the read to be interrupted
                               2574                 :                :  *      via the process's latch, and this routine will get called.
                               2575                 :                :  *      If we are truly idle (ie, *not* inside a transaction block),
                               2576                 :                :  *      process the incoming notifies.
                               2577                 :                :  *
                               2578                 :                :  *      If "flush" is true, force any frontend messages out immediately.
                               2579                 :                :  *      This can be false when being called at the end of a frontend command,
                               2580                 :                :  *      since we'll flush after sending ReadyForQuery.
                               2581                 :                :  */
                               2582                 :                : void
 1643                          2583                 :             38 : ProcessNotifyInterrupt(bool flush)
                               2584                 :                : {
 8186                          2585         [ +  + ]:             38 :     if (IsTransactionOrTransactionBlock())
10022                          2586                 :              7 :         return;                 /* not really idle */
                               2587                 :                : 
                               2588                 :                :     /* Loop in case another signal arrives while sending messages */
 4058 andres@anarazel.de       2589         [ +  + ]:             62 :     while (notifyInterruptPending)
 1643 tgl@sss.pgh.pa.us        2590                 :             31 :         ProcessIncomingNotify(flush);
                               2591                 :                : }
                               2592                 :                : 
                               2593                 :                : 
                               2594                 :                : /*
                               2595                 :                :  * Read all pending notifications from the queue, and deliver appropriate
                               2596                 :                :  * ones to my frontend.  Stop when we reach queue head or an uncommitted
                               2597                 :                :  * notification.
                               2598                 :                :  */
                               2599                 :                : static void
 5871                          2600                 :             46 : asyncQueueReadAllNotifications(void)
                               2601                 :                : {
                               2602                 :                :     QueuePosition pos;
                               2603                 :                :     QueuePosition head;
                               2604                 :                :     Snapshot    snapshot;
                               2605                 :                : 
                               2606                 :                :     /*
                               2607                 :                :      * Fetch current state, indicate to others that we have woken up, and that
                               2608                 :                :      * we are in process of advancing our position.
                               2609                 :                :      */
 2130                          2610                 :             46 :     LWLockAcquire(NotifyQueueLock, LW_SHARED);
                               2611                 :                :     /* Assert checks that we have a valid state entry */
  742 heikki.linnakangas@i     2612         [ -  + ]:             46 :     Assert(MyProcPid == QUEUE_BACKEND_PID(MyProcNumber));
   59 tgl@sss.pgh.pa.us        2613                 :GNC          46 :     QUEUE_BACKEND_WAKEUP_PENDING(MyProcNumber) = false;
  742 heikki.linnakangas@i     2614                 :CBC          46 :     pos = QUEUE_BACKEND_POS(MyProcNumber);
 5871 tgl@sss.pgh.pa.us        2615                 :             46 :     head = QUEUE_HEAD;
                               2616                 :                : 
                               2617   [ +  +  -  + ]:             46 :     if (QUEUE_POS_EQUAL(pos, head))
                               2618                 :                :     {
                               2619                 :                :         /* Nothing to do, we have read all notifications already. */
   59 tgl@sss.pgh.pa.us        2620                 :UNC           0 :         LWLockRelease(NotifyQueueLock);
 5871 tgl@sss.pgh.pa.us        2621                 :UBC           0 :         return;
                               2622                 :                :     }
                               2623                 :                : 
   59 tgl@sss.pgh.pa.us        2624                 :GNC          46 :     QUEUE_BACKEND_IS_ADVANCING(MyProcNumber) = true;
                               2625                 :             46 :     LWLockRelease(NotifyQueueLock);
                               2626                 :                : 
                               2627                 :                :     /*----------
                               2628                 :                :      * Get snapshot we'll use to decide which xacts are still in progress.
                               2629                 :                :      * This is trickier than it might seem, because of race conditions.
                               2630                 :                :      * Consider the following example:
                               2631                 :                :      *
                               2632                 :                :      * Backend 1:                    Backend 2:
                               2633                 :                :      *
                               2634                 :                :      * transaction starts
                               2635                 :                :      * UPDATE foo SET ...;
                               2636                 :                :      * NOTIFY foo;
                               2637                 :                :      * commit starts
                               2638                 :                :      * queue the notify message
                               2639                 :                :      *                               transaction starts
                               2640                 :                :      *                               LISTEN foo;  -- first LISTEN in session
                               2641                 :                :      *                               SELECT * FROM foo WHERE ...;
                               2642                 :                :      * commit to clog
                               2643                 :                :      *                               commit starts
                               2644                 :                :      *                               add backend 2 to array of listeners
                               2645                 :                :      *                               advance to queue head (this code)
                               2646                 :                :      *                               commit to clog
                               2647                 :                :      *
                               2648                 :                :      * Transaction 2's SELECT has not seen the UPDATE's effects, since that
                               2649                 :                :      * wasn't committed yet.  Ideally we'd ensure that client 2 would
                               2650                 :                :      * eventually get transaction 1's notify message, but there's no way
                               2651                 :                :      * to do that; until we're in the listener array, there's no guarantee
                               2652                 :                :      * that the notify message doesn't get removed from the queue.
                               2653                 :                :      *
                               2654                 :                :      * Therefore the coding technique transaction 2 is using is unsafe:
                               2655                 :                :      * applications must commit a LISTEN before inspecting database state,
                               2656                 :                :      * if they want to ensure they will see notifications about subsequent
                               2657                 :                :      * changes to that state.
                               2658                 :                :      *
                               2659                 :                :      * What we do guarantee is that we'll see all notifications from
                               2660                 :                :      * transactions committing after the snapshot we take here.
                               2661                 :                :      * BecomeRegisteredListener has already added us to the listener array,
                               2662                 :                :      * so no not-yet-committed messages can be removed from the queue
                               2663                 :                :      * before we see them.
                               2664                 :                :      *----------
                               2665                 :                :      */
 2303 tgl@sss.pgh.pa.us        2666                 :CBC          46 :     snapshot = RegisterSnapshot(GetLatestSnapshot());
                               2667                 :                : 
                               2668                 :                :     /*
                               2669                 :                :      * It is possible that we fail while trying to send a message to our
                               2670                 :                :      * frontend (for example, because of encoding conversion failure).  If
                               2671                 :                :      * that happens it is critical that we not try to send the same message
                               2672                 :                :      * over and over again.  Therefore, we set ExitOnAnyError to upgrade any
                               2673                 :                :      * ERRORs to FATAL, causing the client connection to be closed on error.
                               2674                 :                :      *
                               2675                 :                :      * We used to only skip over the offending message and try to soldier on,
                               2676                 :                :      * but it was somewhat questionable to lose a notification and give the
                               2677                 :                :      * client an ERROR instead.  A client application is not be prepared for
                               2678                 :                :      * that and can't tell that a notification was missed.  It was also not
                               2679                 :                :      * very useful in practice because notifications are often processed while
                               2680                 :                :      * a connection is idle and reading a message from the client, and in that
                               2681                 :                :      * state, any error is upgraded to FATAL anyway.  Closing the connection
                               2682                 :                :      * is a clear signal to the application that it might have missed
                               2683                 :                :      * notifications.
                               2684                 :                :      */
                               2685                 :                :     {
  123 heikki.linnakangas@i     2686                 :             46 :         bool        save_ExitOnAnyError = ExitOnAnyError;
                               2687                 :                :         bool        reachedStop;
                               2688                 :                : 
                               2689                 :             46 :         ExitOnAnyError = true;
                               2690                 :                : 
                               2691                 :                :         do
                               2692                 :                :         {
                               2693                 :                :             /*
                               2694                 :                :              * Process messages up to the stop position, end of page, or an
                               2695                 :                :              * uncommitted message.
                               2696                 :                :              *
                               2697                 :                :              * Our stop position is what we found to be the head's position
                               2698                 :                :              * when we entered this function. It might have changed already.
                               2699                 :                :              * But if it has, we will receive (or have already received and
                               2700                 :                :              * queued) another signal and come here again.
                               2701                 :                :              *
                               2702                 :                :              * We are not holding NotifyQueueLock here! The queue can only
                               2703                 :                :              * extend beyond the head pointer (see above) and we leave our
                               2704                 :                :              * backend's pointer where it is so nobody will truncate or
                               2705                 :                :              * rewrite pages under us. Especially we don't want to hold a lock
                               2706                 :                :              * while sending the notifications to the frontend.
                               2707                 :                :              */
                               2708                 :             49 :             reachedStop = asyncQueueProcessPageEntries(&pos, head, snapshot);
 5871 tgl@sss.pgh.pa.us        2709         [ +  + ]:             49 :         } while (!reachedStop);
                               2710                 :                : 
                               2711                 :                :         /* Update shared state */
 2130                          2712                 :             46 :         LWLockAcquire(NotifyQueueLock, LW_SHARED);
  742 heikki.linnakangas@i     2713                 :             46 :         QUEUE_BACKEND_POS(MyProcNumber) = pos;
   59 tgl@sss.pgh.pa.us        2714                 :GNC          46 :         QUEUE_BACKEND_IS_ADVANCING(MyProcNumber) = false;
 2130 tgl@sss.pgh.pa.us        2715                 :CBC          46 :         LWLockRelease(NotifyQueueLock);
                               2716                 :                : 
  123 heikki.linnakangas@i     2717                 :             46 :         ExitOnAnyError = save_ExitOnAnyError;
                               2718                 :                :     }
                               2719                 :                : 
                               2720                 :                :     /* Done with snapshot */
 3077 tgl@sss.pgh.pa.us        2721                 :             46 :     UnregisterSnapshot(snapshot);
                               2722                 :                : }
                               2723                 :                : 
                               2724                 :                : /*
                               2725                 :                :  * Fetch notifications from the shared queue, beginning at position current,
                               2726                 :                :  * and deliver relevant ones to my frontend.
                               2727                 :                :  *
                               2728                 :                :  * The function returns true once we have reached the stop position or an
                               2729                 :                :  * uncommitted notification, and false if we have finished with the page.
                               2730                 :                :  * In other words: once it returns true there is no need to look further.
                               2731                 :                :  * The QueuePosition *current is advanced past all processed messages.
                               2732                 :                :  */
                               2733                 :                : static bool
  123 heikki.linnakangas@i     2734                 :             49 : asyncQueueProcessPageEntries(QueuePosition *current,
                               2735                 :                :                              QueuePosition stop,
                               2736                 :                :                              Snapshot snapshot)
                               2737                 :                : {
                               2738                 :             49 :     int64       curpage = QUEUE_POS_PAGE(*current);
                               2739                 :                :     int         slotno;
                               2740                 :                :     char       *page_buffer;
 5871 tgl@sss.pgh.pa.us        2741                 :             49 :     bool        reachedStop = false;
                               2742                 :                :     bool        reachedEndOfPage;
                               2743                 :                : 
                               2744                 :                :     /*
                               2745                 :                :      * We copy the entries into a local buffer to avoid holding the SLRU lock
                               2746                 :                :      * while we transmit them to our frontend.  The local buffer must be
                               2747                 :                :      * adequately aligned.
                               2748                 :                :      */
                               2749                 :                :     alignas(AsyncQueueEntry) char local_buf[QUEUE_PAGESIZE];
  114 peter@eisentraut.org     2750                 :GNC          49 :     char       *local_buf_end = local_buf;
                               2751                 :                : 
    2 heikki.linnakangas@i     2752                 :             49 :     slotno = SimpleLruReadPage_ReadOnly(NotifyCtl, curpage, current);
  123 heikki.linnakangas@i     2753                 :CBC          49 :     page_buffer = NotifyCtl->shared->page_buffer[slotno];
                               2754                 :                : 
                               2755                 :                :     do
                               2756                 :                :     {
 5861 bruce@momjian.us         2757                 :           1327 :         QueuePosition thisentry = *current;
                               2758                 :                :         AsyncQueueEntry *qe;
                               2759                 :                : 
 5870 tgl@sss.pgh.pa.us        2760   [ +  +  +  + ]:           1327 :         if (QUEUE_POS_EQUAL(thisentry, stop))
 5871                          2761                 :             46 :             break;
                               2762                 :                : 
 5870                          2763                 :           1281 :         qe = (AsyncQueueEntry *) (page_buffer + QUEUE_POS_OFFSET(thisentry));
                               2764                 :                : 
                               2765                 :                :         /*
                               2766                 :                :          * Advance *current over this message, possibly to the next page. As
                               2767                 :                :          * noted in the comments for asyncQueueReadAllNotifications, we must
                               2768                 :                :          * do this before possibly failing while processing the message.
                               2769                 :                :          */
 5871                          2770                 :           1281 :         reachedEndOfPage = asyncQueueAdvance(current, qe->length);
                               2771                 :                : 
                               2772                 :                :         /* Ignore messages destined for other databases */
                               2773         [ +  - ]:           1281 :         if (qe->dboid == MyDatabaseId)
                               2774                 :                :         {
 3077                          2775         [ -  + ]:           1281 :             if (XidInMVCCSnapshot(qe->xid, snapshot))
                               2776                 :                :             {
                               2777                 :                :                 /*
                               2778                 :                :                  * The source transaction is still in progress, so we can't
                               2779                 :                :                  * process this message yet.  Break out of the loop, but first
                               2780                 :                :                  * back up *current so we will reprocess the message next
                               2781                 :                :                  * time.  (Note: it is unlikely but not impossible for
                               2782                 :                :                  * TransactionIdDidCommit to fail, so we can't really avoid
                               2783                 :                :                  * this advance-then-back-up behavior when dealing with an
                               2784                 :                :                  * uncommitted message.)
                               2785                 :                :                  *
                               2786                 :                :                  * Note that we must test XidInMVCCSnapshot before we test
                               2787                 :                :                  * TransactionIdDidCommit, else we might return a message from
                               2788                 :                :                  * a transaction that is not yet visible to snapshots; compare
                               2789                 :                :                  * the comments at the head of heapam_visibility.c.
                               2790                 :                :                  *
                               2791                 :                :                  * Also, while our own xact won't be listed in the snapshot,
                               2792                 :                :                  * we need not check for TransactionIdIsCurrentTransactionId
                               2793                 :                :                  * because our transaction cannot (yet) have queued any
                               2794                 :                :                  * messages.
                               2795                 :                :                  */
 4385 tgl@sss.pgh.pa.us        2796                 :UBC           0 :                 *current = thisentry;
                               2797                 :              0 :                 reachedStop = true;
                               2798                 :              0 :                 break;
                               2799                 :                :             }
                               2800                 :                : 
                               2801                 :                :             /*
                               2802                 :                :              * Quick check for the case that we're not listening on any
                               2803                 :                :              * channels, before calling TransactionIdDidCommit().  This makes
                               2804                 :                :              * that case a little faster, but more importantly, it ensures
                               2805                 :                :              * that if there's a bad entry in the queue for which
                               2806                 :                :              * TransactionIdDidCommit() fails for some reason, we can skip
                               2807                 :                :              * over it on the first LISTEN in a session, and not get stuck on
                               2808                 :                :              * it indefinitely.  (This is a little trickier than it looks: it
                               2809                 :                :              * works because BecomeRegisteredListener runs this code before we
                               2810                 :                :              * have made the first entry in localChannelTable.)
                               2811                 :                :              */
   59 tgl@sss.pgh.pa.us        2812   [ +  -  +  + ]:GNC        1281 :             if (LocalChannelTableIsEmpty())
  123 heikki.linnakangas@i     2813                 :CBC        1226 :                 continue;
                               2814                 :                : 
                               2815         [ +  - ]:             55 :             if (TransactionIdDidCommit(qe->xid))
                               2816                 :                :             {
                               2817                 :             55 :                 memcpy(local_buf_end, qe, qe->length);
                               2818                 :             55 :                 local_buf_end += qe->length;
                               2819                 :                :             }
                               2820                 :                :             else
                               2821                 :                :             {
                               2822                 :                :                 /*
                               2823                 :                :                  * The source transaction aborted or crashed, so we just
                               2824                 :                :                  * ignore its notifications.
                               2825                 :                :                  */
                               2826                 :                :             }
                               2827                 :                :         }
                               2828                 :                : 
                               2829                 :                :         /* Loop back if we're not at end of page */
 5871 tgl@sss.pgh.pa.us        2830         [ +  + ]:           1281 :     } while (!reachedEndOfPage);
                               2831                 :                : 
                               2832                 :                :     /* Release lock that we got from SimpleLruReadPage_ReadOnly() */
  123 heikki.linnakangas@i     2833                 :             49 :     LWLockRelease(SimpleLruGetBankLock(NotifyCtl, curpage));
                               2834                 :                : 
                               2835                 :                :     /*
                               2836                 :                :      * Now that we have let go of the SLRU bank lock, send the notifications
                               2837                 :                :      * to our backend
                               2838                 :                :      */
  114 peter@eisentraut.org     2839         [ -  + ]:GNC          49 :     Assert(local_buf_end - local_buf <= BLCKSZ);
                               2840         [ +  + ]:            104 :     for (char *p = local_buf; p < local_buf_end;)
                               2841                 :                :     {
  123 heikki.linnakangas@i     2842                 :CBC          55 :         AsyncQueueEntry *qe = (AsyncQueueEntry *) p;
                               2843                 :                : 
                               2844                 :                :         /* qe->data is the null-terminated channel name */
                               2845                 :             55 :         char       *channel = qe->data;
                               2846                 :                : 
                               2847         [ +  - ]:             55 :         if (IsListeningOn(channel))
                               2848                 :                :         {
                               2849                 :                :             /* payload follows channel name */
                               2850                 :             55 :             char       *payload = qe->data + strlen(channel) + 1;
                               2851                 :                : 
                               2852                 :             55 :             NotifyMyFrontEnd(channel, payload, qe->srcPid);
                               2853                 :                :         }
                               2854                 :                : 
                               2855                 :             55 :         p += qe->length;
                               2856                 :                :     }
                               2857                 :                : 
 5871 tgl@sss.pgh.pa.us        2858   [ +  +  +  + ]:             49 :     if (QUEUE_POS_EQUAL(*current, stop))
                               2859                 :             46 :         reachedStop = true;
                               2860                 :                : 
                               2861                 :             49 :     return reachedStop;
                               2862                 :                : }
                               2863                 :                : 
                               2864                 :                : /*
                               2865                 :                :  * Advance the shared queue tail variable to the minimum of all the
                               2866                 :                :  * per-backend tail pointers.  Truncate pg_notify space if possible.
                               2867                 :                :  *
                               2868                 :                :  * This is (usually) called during CommitTransaction(), so it's important for
                               2869                 :                :  * it to have very low probability of failure.
                               2870                 :                :  */
                               2871                 :                : static void
                               2872                 :             13 : asyncQueueAdvanceTail(void)
                               2873                 :                : {
                               2874                 :                :     QueuePosition min;
                               2875                 :                :     int64       oldtailpage;
                               2876                 :                :     int64       newtailpage;
                               2877                 :                :     int64       boundary;
                               2878                 :                : 
                               2879                 :                :     /* Restrict task to one backend per cluster; see SimpleLruTruncate(). */
 2038 noah@leadboat.com        2880                 :             13 :     LWLockAcquire(NotifyQueueTailLock, LW_EXCLUSIVE);
                               2881                 :                : 
                               2882                 :                :     /*
                               2883                 :                :      * Compute the new tail.  Pre-v13, it's essential that QUEUE_TAIL be exact
                               2884                 :                :      * (ie, exactly match at least one backend's queue position), so it must
                               2885                 :                :      * be updated atomically with the actual computation.  Since v13, we could
                               2886                 :                :      * get away with not doing it like that, but it seems prudent to keep it
                               2887                 :                :      * so.
                               2888                 :                :      *
                               2889                 :                :      * Also, because incoming backends will scan forward from QUEUE_TAIL, that
                               2890                 :                :      * must be advanced before we can truncate any data.  Thus, QUEUE_TAIL is
                               2891                 :                :      * the logical tail, while QUEUE_STOP_PAGE is the physical tail, or oldest
                               2892                 :                :      * un-truncated page.  When QUEUE_STOP_PAGE != QUEUE_POS_PAGE(QUEUE_TAIL),
                               2893                 :                :      * there are pages we can truncate but haven't yet finished doing so.
                               2894                 :                :      *
                               2895                 :                :      * For concurrency's sake, we don't want to hold NotifyQueueLock while
                               2896                 :                :      * performing SimpleLruTruncate.  This is OK because no backend will try
                               2897                 :                :      * to access the pages we are in the midst of truncating.
                               2898                 :                :      */
 2130 tgl@sss.pgh.pa.us        2899                 :             13 :     LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
 5871                          2900                 :             13 :     min = QUEUE_HEAD;
  742 heikki.linnakangas@i     2901         [ +  + ]:             23 :     for (ProcNumber i = QUEUE_FIRST_LISTENER; i != INVALID_PROC_NUMBER; i = QUEUE_NEXT_LISTENER(i))
                               2902                 :                :     {
 2378 tgl@sss.pgh.pa.us        2903         [ -  + ]:             10 :         Assert(QUEUE_BACKEND_PID(i) != InvalidPid);
                               2904   [ +  -  +  +  :             10 :         min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i));
                                              +  - ]
                               2905                 :                :     }
 1933                          2906                 :             13 :     QUEUE_TAIL = min;
                               2907                 :             13 :     oldtailpage = QUEUE_STOP_PAGE;
 2130                          2908                 :             13 :     LWLockRelease(NotifyQueueLock);
                               2909                 :                : 
                               2910                 :                :     /*
                               2911                 :                :      * We can truncate something if the global tail advanced across an SLRU
                               2912                 :                :      * segment boundary.
                               2913                 :                :      *
                               2914                 :                :      * XXX it might be better to truncate only once every several segments, to
                               2915                 :                :      * reduce the number of directory scans.
                               2916                 :                :      */
 5871                          2917                 :             13 :     newtailpage = QUEUE_POS_PAGE(min);
                               2918                 :             13 :     boundary = newtailpage - (newtailpage % SLRU_PAGES_PER_SEGMENT);
 5282 alvherre@alvh.no-ip.     2919         [ +  + ]:             13 :     if (asyncQueuePagePrecedes(oldtailpage, boundary))
                               2920                 :                :     {
                               2921                 :                :         /*
                               2922                 :                :          * SimpleLruTruncate() will ask for SLRU bank locks but will also
                               2923                 :                :          * release the lock again.
                               2924                 :                :          */
 2130 tgl@sss.pgh.pa.us        2925                 :GBC           1 :         SimpleLruTruncate(NotifyCtl, newtailpage);
                               2926                 :                : 
 1933                          2927                 :              1 :         LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
                               2928                 :              1 :         QUEUE_STOP_PAGE = newtailpage;
                               2929                 :              1 :         LWLockRelease(NotifyQueueLock);
                               2930                 :                :     }
                               2931                 :                : 
 2038 noah@leadboat.com        2932                 :CBC          13 :     LWLockRelease(NotifyQueueTailLock);
 5871 tgl@sss.pgh.pa.us        2933                 :             13 : }
                               2934                 :                : 
                               2935                 :                : /*
                               2936                 :                :  * AsyncNotifyFreezeXids
                               2937                 :                :  *
                               2938                 :                :  * Prepare the async notification queue for CLOG truncation by freezing
                               2939                 :                :  * transaction IDs that are about to become inaccessible.
                               2940                 :                :  *
                               2941                 :                :  * This function is called by VACUUM before advancing datfrozenxid. It scans
                               2942                 :                :  * the notification queue and replaces XIDs that would become inaccessible
                               2943                 :                :  * after CLOG truncation with special markers:
                               2944                 :                :  * - Committed transactions are set to FrozenTransactionId
                               2945                 :                :  * - Aborted/crashed transactions are set to InvalidTransactionId
                               2946                 :                :  *
                               2947                 :                :  * Only XIDs < newFrozenXid are processed, as those are the ones whose CLOG
                               2948                 :                :  * pages will be truncated. If XID < newFrozenXid, it cannot still be running
                               2949                 :                :  * (or it would have held back newFrozenXid through ProcArray).
                               2950                 :                :  * Therefore, if TransactionIdDidCommit returns false, we know the transaction
                               2951                 :                :  * either aborted explicitly or crashed, and we can safely mark it invalid.
                               2952                 :                :  */
                               2953                 :                : void
  123 heikki.linnakangas@i     2954                 :            103 : AsyncNotifyFreezeXids(TransactionId newFrozenXid)
                               2955                 :                : {
                               2956                 :                :     QueuePosition pos;
                               2957                 :                :     QueuePosition head;
                               2958                 :            103 :     int64       curpage = -1;
                               2959                 :            103 :     int         slotno = -1;
                               2960                 :            103 :     char       *page_buffer = NULL;
                               2961                 :            103 :     bool        page_dirty = false;
                               2962                 :                : 
                               2963                 :                :     /*
                               2964                 :                :      * Acquire locks in the correct order to avoid deadlocks. As per the
                               2965                 :                :      * locking protocol: NotifyQueueTailLock, then NotifyQueueLock, then SLRU
                               2966                 :                :      * bank locks.
                               2967                 :                :      *
                               2968                 :                :      * We only need SHARED mode since we're just reading the head/tail
                               2969                 :                :      * positions, not modifying them.
                               2970                 :                :      */
                               2971                 :            103 :     LWLockAcquire(NotifyQueueTailLock, LW_SHARED);
                               2972                 :            103 :     LWLockAcquire(NotifyQueueLock, LW_SHARED);
                               2973                 :                : 
                               2974                 :            103 :     pos = QUEUE_TAIL;
                               2975                 :            103 :     head = QUEUE_HEAD;
                               2976                 :                : 
                               2977                 :                :     /* Release NotifyQueueLock early, we only needed to read the positions */
                               2978                 :            103 :     LWLockRelease(NotifyQueueLock);
                               2979                 :                : 
                               2980                 :                :     /*
                               2981                 :                :      * Scan the queue from tail to head, freezing XIDs as needed. We hold
                               2982                 :                :      * NotifyQueueTailLock throughout to ensure the tail doesn't move while
                               2983                 :                :      * we're working.
                               2984                 :                :      */
                               2985   [ -  +  -  + ]:            103 :     while (!QUEUE_POS_EQUAL(pos, head))
                               2986                 :                :     {
                               2987                 :                :         AsyncQueueEntry *qe;
                               2988                 :                :         TransactionId xid;
  123 heikki.linnakangas@i     2989                 :UBC           0 :         int64       pageno = QUEUE_POS_PAGE(pos);
                               2990                 :              0 :         int         offset = QUEUE_POS_OFFSET(pos);
                               2991                 :                : 
                               2992                 :                :         /* If we need a different page, release old lock and get new one */
                               2993         [ #  # ]:              0 :         if (pageno != curpage)
                               2994                 :                :         {
                               2995                 :                :             LWLock     *lock;
                               2996                 :                : 
                               2997                 :                :             /* Release previous page if any */
                               2998         [ #  # ]:              0 :             if (slotno >= 0)
                               2999                 :                :             {
                               3000         [ #  # ]:              0 :                 if (page_dirty)
                               3001                 :                :                 {
                               3002                 :              0 :                     NotifyCtl->shared->page_dirty[slotno] = true;
                               3003                 :              0 :                     page_dirty = false;
                               3004                 :                :                 }
                               3005                 :              0 :                 LWLockRelease(SimpleLruGetBankLock(NotifyCtl, curpage));
                               3006                 :                :             }
                               3007                 :                : 
                               3008                 :              0 :             lock = SimpleLruGetBankLock(NotifyCtl, pageno);
                               3009                 :              0 :             LWLockAcquire(lock, LW_EXCLUSIVE);
    2 heikki.linnakangas@i     3010                 :UNC           0 :             slotno = SimpleLruReadPage(NotifyCtl, pageno, true, &pos);
  123 heikki.linnakangas@i     3011                 :UBC           0 :             page_buffer = NotifyCtl->shared->page_buffer[slotno];
                               3012                 :              0 :             curpage = pageno;
                               3013                 :                :         }
                               3014                 :                : 
                               3015                 :              0 :         qe = (AsyncQueueEntry *) (page_buffer + offset);
                               3016                 :              0 :         xid = qe->xid;
                               3017                 :                : 
                               3018   [ #  #  #  # ]:              0 :         if (TransactionIdIsNormal(xid) &&
                               3019                 :              0 :             TransactionIdPrecedes(xid, newFrozenXid))
                               3020                 :                :         {
                               3021         [ #  # ]:              0 :             if (TransactionIdDidCommit(xid))
                               3022                 :                :             {
                               3023                 :              0 :                 qe->xid = FrozenTransactionId;
                               3024                 :              0 :                 page_dirty = true;
                               3025                 :                :             }
                               3026                 :                :             else
                               3027                 :                :             {
                               3028                 :              0 :                 qe->xid = InvalidTransactionId;
                               3029                 :              0 :                 page_dirty = true;
                               3030                 :                :             }
                               3031                 :                :         }
                               3032                 :                : 
                               3033                 :                :         /* Advance to next entry */
                               3034                 :              0 :         asyncQueueAdvance(&pos, qe->length);
                               3035                 :                :     }
                               3036                 :                : 
                               3037                 :                :     /* Release final page lock if we acquired one */
  123 heikki.linnakangas@i     3038         [ -  + ]:CBC         103 :     if (slotno >= 0)
                               3039                 :                :     {
  123 heikki.linnakangas@i     3040         [ #  # ]:UBC           0 :         if (page_dirty)
                               3041                 :              0 :             NotifyCtl->shared->page_dirty[slotno] = true;
                               3042                 :              0 :         LWLockRelease(SimpleLruGetBankLock(NotifyCtl, curpage));
                               3043                 :                :     }
                               3044                 :                : 
  123 heikki.linnakangas@i     3045                 :CBC         103 :     LWLockRelease(NotifyQueueTailLock);
                               3046                 :            103 : }
                               3047                 :                : 
                               3048                 :                : /*
                               3049                 :                :  * ProcessIncomingNotify
                               3050                 :                :  *
                               3051                 :                :  *      Scan the queue for arriving notifications and report them to the front
                               3052                 :                :  *      end.  The notifications might be from other sessions, or our own;
                               3053                 :                :  *      there's no need to distinguish here.
                               3054                 :                :  *
                               3055                 :                :  *      If "flush" is true, force any frontend messages out immediately.
                               3056                 :                :  *
                               3057                 :                :  *      NOTE: since we are outside any transaction, we must create our own.
                               3058                 :                :  */
                               3059                 :                : static void
 1643 tgl@sss.pgh.pa.us        3060                 :             31 : ProcessIncomingNotify(bool flush)
                               3061                 :                : {
                               3062                 :                :     /* We *must* reset the flag */
 4058 andres@anarazel.de       3063                 :             31 :     notifyInterruptPending = false;
                               3064                 :                : 
                               3065                 :                :     /* Do nothing else if we aren't actively listening */
   59 tgl@sss.pgh.pa.us        3066   [ +  -  -  + ]:GNC          31 :     if (LocalChannelTableIsEmpty())
 5871 tgl@sss.pgh.pa.us        3067                 :UBC           0 :         return;
                               3068                 :                : 
 9419 peter_e@gmx.net          3069         [ -  + ]:CBC          31 :     if (Trace_notify)
 8328 bruce@momjian.us         3070         [ #  # ]:UBC           0 :         elog(DEBUG1, "ProcessIncomingNotify");
                               3071                 :                : 
 2195 peter@eisentraut.org     3072                 :CBC          31 :     set_ps_display("notify interrupt");
                               3073                 :                : 
                               3074                 :                :     /*
                               3075                 :                :      * We must run asyncQueueReadAllNotifications inside a transaction, else
                               3076                 :                :      * bad things happen if it gets an error.
                               3077                 :                :      */
 5871 tgl@sss.pgh.pa.us        3078                 :             31 :     StartTransactionCommand();
                               3079                 :                : 
                               3080                 :             31 :     asyncQueueReadAllNotifications();
                               3081                 :                : 
 8341                          3082                 :             31 :     CommitTransactionCommand();
                               3083                 :                : 
                               3084                 :                :     /*
                               3085                 :                :      * If this isn't an end-of-command case, we must flush the notify messages
                               3086                 :                :      * to ensure frontend gets them promptly.
                               3087                 :                :      */
 1643                          3088         [ +  + ]:             31 :     if (flush)
                               3089                 :              9 :         pq_flush();
                               3090                 :                : 
 2195 peter@eisentraut.org     3091                 :             31 :     set_ps_display("idle");
                               3092                 :                : 
 9419 peter_e@gmx.net          3093         [ -  + ]:             31 :     if (Trace_notify)
 8328 bruce@momjian.us         3094         [ #  # ]:UBC           0 :         elog(DEBUG1, "ProcessIncomingNotify: done");
                               3095                 :                : }
                               3096                 :                : 
                               3097                 :                : /*
                               3098                 :                :  * Send NOTIFY message to my front end.
                               3099                 :                :  */
                               3100                 :                : void
 5871 tgl@sss.pgh.pa.us        3101                 :CBC          55 : NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid)
                               3102                 :                : {
 7437 alvherre@alvh.no-ip.     3103         [ +  - ]:             55 :     if (whereToSendOutput == DestRemote)
                               3104                 :                :     {
                               3105                 :                :         StringInfoData buf;
                               3106                 :                : 
  936 nathan@postgresql.or     3107                 :             55 :         pq_beginmessage(&buf, PqMsg_NotificationResponse);
 3077 andres@anarazel.de       3108                 :             55 :         pq_sendint32(&buf, srcPid);
 5871 tgl@sss.pgh.pa.us        3109                 :             55 :         pq_sendstring(&buf, channel);
 1837 heikki.linnakangas@i     3110                 :             55 :         pq_sendstring(&buf, payload);
 9821 tgl@sss.pgh.pa.us        3111                 :             55 :         pq_endmessage(&buf);
                               3112                 :                : 
                               3113                 :                :         /*
                               3114                 :                :          * NOTE: we do not do pq_flush() here.  Some level of caller will
                               3115                 :                :          * handle it later, allowing this message to be combined into a packet
                               3116                 :                :          * with other ones.
                               3117                 :                :          */
                               3118                 :                :     }
                               3119                 :                :     else
 5871 tgl@sss.pgh.pa.us        3120         [ #  # ]:UBC           0 :         elog(INFO, "NOTIFY for \"%s\" payload \"%s\"", channel, payload);
10022 tgl@sss.pgh.pa.us        3121                 :CBC          55 : }
                               3122                 :                : 
                               3123                 :                : /* Does pendingNotifies include a match for the given event? */
                               3124                 :                : static bool
 2404                          3125                 :           1066 : AsyncExistsPendingNotify(Notification *n)
                               3126                 :                : {
                               3127         [ -  + ]:           1066 :     if (pendingNotifies == NULL)
 5871 tgl@sss.pgh.pa.us        3128                 :UBC           0 :         return false;
                               3129                 :                : 
 2404 tgl@sss.pgh.pa.us        3130         [ +  + ]:CBC        1066 :     if (pendingNotifies->hashtab != NULL)
                               3131                 :                :     {
                               3132                 :                :         /* Use the hash table to probe for a match */
                               3133         [ +  + ]:            984 :         if (hash_search(pendingNotifies->hashtab,
                               3134                 :                :                         &n,
                               3135                 :                :                         HASH_FIND,
                               3136                 :                :                         NULL))
 2404 tgl@sss.pgh.pa.us        3137                 :GBC           1 :             return true;
                               3138                 :                :     }
                               3139                 :                :     else
                               3140                 :                :     {
                               3141                 :                :         /* Must scan the event list */
                               3142                 :                :         ListCell   *l;
                               3143                 :                : 
 2404 tgl@sss.pgh.pa.us        3144   [ +  -  +  +  :CBC         425 :         foreach(l, pendingNotifies->events)
                                              +  + ]
                               3145                 :                :         {
                               3146                 :            357 :             Notification *oldn = (Notification *) lfirst(l);
                               3147                 :                : 
                               3148         [ +  - ]:            357 :             if (n->channel_len == oldn->channel_len &&
                               3149         [ +  + ]:            357 :                 n->payload_len == oldn->payload_len &&
                               3150                 :            190 :                 memcmp(n->data, oldn->data,
                               3151         [ +  + ]:            190 :                        n->channel_len + n->payload_len + 2) == 0)
                               3152                 :             14 :                 return true;
                               3153                 :                :         }
                               3154                 :                :     }
                               3155                 :                : 
                               3156                 :           1051 :     return false;
                               3157                 :                : }
                               3158                 :                : 
                               3159                 :                : /*
                               3160                 :                :  * Add a notification event to a pre-existing pendingNotifies list.
                               3161                 :                :  *
                               3162                 :                :  * Because pendingNotifies->events is already nonempty, this works
                               3163                 :                :  * correctly no matter what CurrentMemoryContext is.
                               3164                 :                :  */
                               3165                 :                : static void
                               3166                 :           1051 : AddEventToPendingNotifies(Notification *n)
                               3167                 :                : {
                               3168         [ -  + ]:           1051 :     Assert(pendingNotifies->events != NIL);
                               3169                 :                : 
                               3170                 :                :     /* Create the hash tables if it's time to */
                               3171         [ +  + ]:           1051 :     if (list_length(pendingNotifies->events) >= MIN_HASHABLE_NOTIFIES &&
                               3172         [ +  + ]:            985 :         pendingNotifies->hashtab == NULL)
                               3173                 :                :     {
                               3174                 :                :         HASHCTL     hash_ctl;
                               3175                 :                :         ListCell   *l;
                               3176                 :                : 
                               3177                 :                :         /* Create the hash table */
                               3178                 :              2 :         hash_ctl.keysize = sizeof(Notification *);
  667 peter@eisentraut.org     3179                 :              2 :         hash_ctl.entrysize = sizeof(struct NotificationHash);
 2404 tgl@sss.pgh.pa.us        3180                 :              2 :         hash_ctl.hash = notification_hash;
                               3181                 :              2 :         hash_ctl.match = notification_match;
                               3182                 :              2 :         hash_ctl.hcxt = CurTransactionContext;
                               3183                 :              4 :         pendingNotifies->hashtab =
                               3184                 :              2 :             hash_create("Pending Notifies",
                               3185                 :                :                         256L,
                               3186                 :                :                         &hash_ctl,
                               3187                 :                :                         HASH_ELEM | HASH_FUNCTION | HASH_COMPARE | HASH_CONTEXT);
                               3188                 :                : 
                               3189                 :                :         /* Create the unique channel name table */
   59 tgl@sss.pgh.pa.us        3190         [ -  + ]:GNC           2 :         Assert(pendingNotifies->uniqueChannelHash == NULL);
                               3191                 :              2 :         hash_ctl.keysize = NAMEDATALEN;
                               3192                 :              2 :         hash_ctl.entrysize = sizeof(ChannelName);
                               3193                 :              2 :         hash_ctl.hcxt = CurTransactionContext;
                               3194                 :              4 :         pendingNotifies->uniqueChannelHash =
                               3195                 :              2 :             hash_create("Pending Notify Channel Names",
                               3196                 :                :                         64L,
                               3197                 :                :                         &hash_ctl,
                               3198                 :                :                         HASH_ELEM | HASH_STRINGS | HASH_CONTEXT);
                               3199                 :                : 
                               3200                 :                :         /* Insert all the already-existing events */
 2404 tgl@sss.pgh.pa.us        3201   [ +  -  +  +  :CBC          34 :         foreach(l, pendingNotifies->events)
                                              +  + ]
                               3202                 :                :         {
                               3203                 :             32 :             Notification *oldn = (Notification *) lfirst(l);
   59 tgl@sss.pgh.pa.us        3204                 :GNC          32 :             char       *channel = oldn->data;
                               3205                 :                :             bool        found;
                               3206                 :                : 
  836 john.naylor@postgres     3207                 :CBC          32 :             (void) hash_search(pendingNotifies->hashtab,
                               3208                 :                :                                &oldn,
                               3209                 :                :                                HASH_ENTER,
                               3210                 :                :                                &found);
 2404 tgl@sss.pgh.pa.us        3211         [ -  + ]:             32 :             Assert(!found);
                               3212                 :                : 
                               3213                 :                :             /* Add channel name to uniqueChannelHash; might be there already */
   59 tgl@sss.pgh.pa.us        3214                 :GNC          32 :             (void) hash_search(pendingNotifies->uniqueChannelHash,
                               3215                 :                :                                channel,
                               3216                 :                :                                HASH_ENTER,
                               3217                 :                :                                NULL);
                               3218                 :                :         }
                               3219                 :                :     }
                               3220                 :                : 
                               3221                 :                :     /* Add new event to the list, in order */
 2404 tgl@sss.pgh.pa.us        3222                 :CBC        1051 :     pendingNotifies->events = lappend(pendingNotifies->events, n);
                               3223                 :                : 
                               3224                 :                :     /* Add event to the hash tables if needed */
                               3225         [ +  + ]:           1051 :     if (pendingNotifies->hashtab != NULL)
                               3226                 :                :     {
   59 tgl@sss.pgh.pa.us        3227                 :GNC         985 :         char       *channel = n->data;
                               3228                 :                :         bool        found;
                               3229                 :                : 
  836 john.naylor@postgres     3230                 :CBC         985 :         (void) hash_search(pendingNotifies->hashtab,
                               3231                 :                :                            &n,
                               3232                 :                :                            HASH_ENTER,
                               3233                 :                :                            &found);
 2404 tgl@sss.pgh.pa.us        3234         [ -  + ]:            985 :         Assert(!found);
                               3235                 :                : 
                               3236                 :                :         /* Add channel name to uniqueChannelHash; might be there already */
   59 tgl@sss.pgh.pa.us        3237                 :GNC         985 :         (void) hash_search(pendingNotifies->uniqueChannelHash,
                               3238                 :                :                            channel,
                               3239                 :                :                            HASH_ENTER,
                               3240                 :                :                            NULL);
                               3241                 :                :     }
 2404 tgl@sss.pgh.pa.us        3242                 :CBC        1051 : }
                               3243                 :                : 
                               3244                 :                : /*
                               3245                 :                :  * notification_hash: hash function for notification hash table
                               3246                 :                :  *
                               3247                 :                :  * The hash "keys" are pointers to Notification structs.
                               3248                 :                :  */
                               3249                 :                : static uint32
                               3250                 :           2001 : notification_hash(const void *key, Size keysize)
                               3251                 :                : {
                               3252                 :           2001 :     const Notification *k = *(const Notification *const *) key;
                               3253                 :                : 
                               3254         [ -  + ]:           2001 :     Assert(keysize == sizeof(Notification *));
                               3255                 :                :     /* We don't bother to include the payload's trailing null in the hash */
                               3256                 :           2001 :     return DatumGetUInt32(hash_any((const unsigned char *) k->data,
                               3257                 :           2001 :                                    k->channel_len + k->payload_len + 1));
                               3258                 :                : }
                               3259                 :                : 
                               3260                 :                : /*
                               3261                 :                :  * notification_match: match function to use with notification_hash
                               3262                 :                :  */
                               3263                 :                : static int
 2404 tgl@sss.pgh.pa.us        3264                 :GBC           1 : notification_match(const void *key1, const void *key2, Size keysize)
                               3265                 :                : {
                               3266                 :              1 :     const Notification *k1 = *(const Notification *const *) key1;
                               3267                 :              1 :     const Notification *k2 = *(const Notification *const *) key2;
                               3268                 :                : 
                               3269         [ -  + ]:              1 :     Assert(keysize == sizeof(Notification *));
                               3270         [ +  - ]:              1 :     if (k1->channel_len == k2->channel_len &&
                               3271         [ +  - ]:              1 :         k1->payload_len == k2->payload_len &&
                               3272                 :              1 :         memcmp(k1->data, k2->data,
                               3273         [ +  - ]:              1 :                k1->channel_len + k1->payload_len + 2) == 0)
                               3274                 :              1 :         return 0;               /* equal */
 2404 tgl@sss.pgh.pa.us        3275                 :UBC           0 :     return 1;                   /* not equal */
                               3276                 :                : }
                               3277                 :                : 
                               3278                 :                : /* Clear the pendingActions and pendingNotifies lists. */
                               3279                 :                : static void
 6577 tgl@sss.pgh.pa.us        3280                 :CBC       26888 : ClearPendingActionsAndNotifies(void)
                               3281                 :                : {
                               3282                 :                :     /*
                               3283                 :                :      * Everything's allocated in either TopTransactionContext or the context
                               3284                 :                :      * for the subtransaction to which it corresponds.  So, there's nothing to
                               3285                 :                :      * do here except reset the pointers; the space will be reclaimed when the
                               3286                 :                :      * contexts are deleted.
                               3287                 :                :      */
 2354 rhaas@postgresql.org     3288                 :          26888 :     pendingActions = NULL;
 2404 tgl@sss.pgh.pa.us        3289                 :          26888 :     pendingNotifies = NULL;
                               3290                 :                :     /* Also clear pendingListenActions, which is derived from pendingActions */
   59 tgl@sss.pgh.pa.us        3291                 :GNC       26888 :     pendingListenActions = NULL;
10416 bruce@momjian.us         3292                 :CBC       26888 : }
                               3293                 :                : 
                               3294                 :                : /*
                               3295                 :                :  * GUC check_hook for notify_buffers
                               3296                 :                :  */
                               3297                 :                : bool
  746 alvherre@alvh.no-ip.     3298                 :           1184 : check_notify_buffers(int *newval, void **extra, GucSource source)
                               3299                 :                : {
                               3300                 :           1184 :     return check_slru_buffers("notify_buffers", newval);
                               3301                 :                : }
        

Generated by: LCOV version 2.4-beta