LCOV - differential code coverage report
Current view: top level - src/backend/storage/ipc - shm_mq.c (source / functions) Coverage Total Hit UBC GNC CBC EUB ECB DCB
Current: 0e5ff9b9b45a657aea12440478dc002e9b01f138 vs 0123ce131fca454009439dfa3b2266d1d40737d7 Lines: 89.7 % 390 350 40 1 349 1
Current Date: 2026-03-14 14:10:32 -0400 Functions: 95.2 % 21 20 1 2 18
Baseline: lcov-20260315-024220-baseline Branches: 74.1 % 224 166 58 166 5 9
Baseline Date: 2026-03-14 15:27:56 +0100 Line coverage date bins:
Legend: Lines:     hit not hit
Branches: + taken - not taken # not executed
(30,360] days: 100.0 % 1 1 1
(360..) days: 89.7 % 389 349 40 349
Function coverage date bins:
(360..) days: 95.2 % 21 20 1 2 18
Branch coverage date bins:
(360..) days: 69.7 % 238 166 58 166 5 9

 Age         Owner                    Branch data    TLA  Line data    Source code
                                  1                 :                : /*-------------------------------------------------------------------------
                                  2                 :                :  *
                                  3                 :                :  * shm_mq.c
                                  4                 :                :  *    single-reader, single-writer shared memory message queue
                                  5                 :                :  *
                                  6                 :                :  * Both the sender and the receiver must have a PGPROC; their respective
                                  7                 :                :  * process latches are used for synchronization.  Only the sender may send,
                                  8                 :                :  * and only the receiver may receive.  This is intended to allow a user
                                  9                 :                :  * backend to communicate with worker backends that it has registered.
                                 10                 :                :  *
                                 11                 :                :  * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
                                 12                 :                :  * Portions Copyright (c) 1994, Regents of the University of California
                                 13                 :                :  *
                                 14                 :                :  * src/backend/storage/ipc/shm_mq.c
                                 15                 :                :  *
                                 16                 :                :  *-------------------------------------------------------------------------
                                 17                 :                :  */
                                 18                 :                : 
                                 19                 :                : #include "postgres.h"
                                 20                 :                : 
                                 21                 :                : #include "miscadmin.h"
                                 22                 :                : #include "pgstat.h"
                                 23                 :                : #include "port/pg_bitutils.h"
                                 24                 :                : #include "postmaster/bgworker.h"
                                 25                 :                : #include "storage/proc.h"
                                 26                 :                : #include "storage/shm_mq.h"
                                 27                 :                : #include "storage/spin.h"
                                 28                 :                : #include "utils/memutils.h"
                                 29                 :                : #include "utils/wait_event.h"
                                 30                 :                : 
                                 31                 :                : /*
                                 32                 :                :  * This structure represents the actual queue, stored in shared memory.
                                 33                 :                :  *
                                 34                 :                :  * Some notes on synchronization:
                                 35                 :                :  *
                                 36                 :                :  * mq_receiver and mq_bytes_read can only be changed by the receiver; and
                                 37                 :                :  * mq_sender and mq_bytes_written can only be changed by the sender.
                                 38                 :                :  * mq_receiver and mq_sender are protected by mq_mutex, although, importantly,
                                 39                 :                :  * they cannot change once set, and thus may be read without a lock once this
                                 40                 :                :  * is known to be the case.
                                 41                 :                :  *
                                 42                 :                :  * mq_bytes_read and mq_bytes_written are not protected by the mutex.  Instead,
                                 43                 :                :  * they are written atomically using 8 byte loads and stores.  Memory barriers
                                 44                 :                :  * must be carefully used to synchronize reads and writes of these values with
                                 45                 :                :  * reads and writes of the actual data in mq_ring.
                                 46                 :                :  *
                                 47                 :                :  * mq_detached needs no locking.  It can be set by either the sender or the
                                 48                 :                :  * receiver, but only ever from false to true, so redundant writes don't
                                 49                 :                :  * matter.  It is important that if we set mq_detached and then set the
                                 50                 :                :  * counterparty's latch, the counterparty must be certain to see the change
                                 51                 :                :  * after waking up.  Since SetLatch begins with a memory barrier and ResetLatch
                                 52                 :                :  * ends with one, this should be OK.
                                 53                 :                :  *
                                 54                 :                :  * mq_ring_size and mq_ring_offset never change after initialization, and
                                 55                 :                :  * can therefore be read without the lock.
                                 56                 :                :  *
                                 57                 :                :  * Importantly, mq_ring can be safely read and written without a lock.
                                 58                 :                :  * At any given time, the difference between mq_bytes_read and
                                 59                 :                :  * mq_bytes_written defines the number of bytes within mq_ring that contain
                                 60                 :                :  * unread data, and mq_bytes_read defines the position where those bytes
                                 61                 :                :  * begin.  The sender can increase the number of unread bytes at any time,
                                 62                 :                :  * but only the receiver can give license to overwrite those bytes, by
                                 63                 :                :  * incrementing mq_bytes_read.  Therefore, it's safe for the receiver to read
                                 64                 :                :  * the unread bytes it knows to be present without the lock.  Conversely,
                                 65                 :                :  * the sender can write to the unused portion of the ring buffer without
                                 66                 :                :  * the lock, because nobody else can be reading or writing those bytes.  The
                                 67                 :                :  * receiver could be making more bytes unused by incrementing mq_bytes_read,
                                 68                 :                :  * but that's OK.  Note that it would be unsafe for the receiver to read any
                                 69                 :                :  * data it's already marked as read, or to write any data; and it would be
                                 70                 :                :  * unsafe for the sender to reread any data after incrementing
                                 71                 :                :  * mq_bytes_written, but fortunately there's no need for any of that.
                                 72                 :                :  */
                                 73                 :                : struct shm_mq
                                 74                 :                : {
                                 75                 :                :     slock_t     mq_mutex;
                                 76                 :                :     PGPROC     *mq_receiver;
                                 77                 :                :     PGPROC     *mq_sender;
                                 78                 :                :     pg_atomic_uint64 mq_bytes_read;
                                 79                 :                :     pg_atomic_uint64 mq_bytes_written;
                                 80                 :                :     Size        mq_ring_size;
                                 81                 :                :     bool        mq_detached;
                                 82                 :                :     uint8       mq_ring_offset;
                                 83                 :                :     char        mq_ring[FLEXIBLE_ARRAY_MEMBER];
                                 84                 :                : };
                                 85                 :                : 
                                 86                 :                : /*
                                 87                 :                :  * This structure is a backend-private handle for access to a queue.
                                 88                 :                :  *
                                 89                 :                :  * mqh_queue is a pointer to the queue we've attached, and mqh_segment is
                                 90                 :                :  * an optional pointer to the dynamic shared memory segment that contains it.
                                 91                 :                :  * (If mqh_segment is provided, we register an on_dsm_detach callback to
                                 92                 :                :  * make sure we detach from the queue before detaching from DSM.)
                                 93                 :                :  *
                                 94                 :                :  * If this queue is intended to connect the current process with a background
                                 95                 :                :  * worker that started it, the user can pass a pointer to the worker handle
                                 96                 :                :  * to shm_mq_attach(), and we'll store it in mqh_handle.  The point of this
                                 97                 :                :  * is to allow us to begin sending to or receiving from that queue before the
                                 98                 :                :  * process we'll be communicating with has even been started.  If it fails
                                 99                 :                :  * to start, the handle will allow us to notice that and fail cleanly, rather
                                100                 :                :  * than waiting forever; see shm_mq_wait_internal.  This is mostly useful in
                                101                 :                :  * simple cases - e.g. where there are just 2 processes communicating; in
                                102                 :                :  * more complex scenarios, every process may not have a BackgroundWorkerHandle
                                103                 :                :  * available, or may need to watch for the failure of more than one other
                                104                 :                :  * process at a time.
                                105                 :                :  *
                                106                 :                :  * When a message exists as a contiguous chunk of bytes in the queue - that is,
                                107                 :                :  * it is smaller than the size of the ring buffer and does not wrap around
                                108                 :                :  * the end - we return the message to the caller as a pointer into the buffer.
                                109                 :                :  * For messages that are larger or happen to wrap, we reassemble the message
                                110                 :                :  * locally by copying the chunks into a backend-local buffer.  mqh_buffer is
                                111                 :                :  * the buffer, and mqh_buflen is the number of bytes allocated for it.
                                112                 :                :  *
                                113                 :                :  * mqh_send_pending, is number of bytes that is written to the queue but not
                                114                 :                :  * yet updated in the shared memory.  We will not update it until the written
                                115                 :                :  * data is 1/4th of the ring size or the tuple queue is full.  This will
                                116                 :                :  * prevent frequent CPU cache misses, and it will also avoid frequent
                                117                 :                :  * SetLatch() calls, which are quite expensive.
                                118                 :                :  *
                                119                 :                :  * mqh_partial_bytes, mqh_expected_bytes, and mqh_length_word_complete
                                120                 :                :  * are used to track the state of non-blocking operations.  When the caller
                                121                 :                :  * attempts a non-blocking operation that returns SHM_MQ_WOULD_BLOCK, they
                                122                 :                :  * are expected to retry the call at a later time with the same argument;
                                123                 :                :  * we need to retain enough state to pick up where we left off.
                                124                 :                :  * mqh_length_word_complete tracks whether we are done sending or receiving
                                125                 :                :  * (whichever we're doing) the entire length word.  mqh_partial_bytes tracks
                                126                 :                :  * the number of bytes read or written for either the length word or the
                                127                 :                :  * message itself, and mqh_expected_bytes - which is used only for reads -
                                128                 :                :  * tracks the expected total size of the payload.
                                129                 :                :  *
                                130                 :                :  * mqh_counterparty_attached tracks whether we know the counterparty to have
                                131                 :                :  * attached to the queue at some previous point.  This lets us avoid some
                                132                 :                :  * mutex acquisitions.
                                133                 :                :  *
                                134                 :                :  * mqh_context is the memory context in effect at the time we attached to
                                135                 :                :  * the shm_mq.  The shm_mq_handle itself is allocated in this context, and
                                136                 :                :  * we make sure any other allocations we do happen in this context as well,
                                137                 :                :  * to avoid nasty surprises.
                                138                 :                :  */
                                139                 :                : struct shm_mq_handle
                                140                 :                : {
                                141                 :                :     shm_mq     *mqh_queue;
                                142                 :                :     dsm_segment *mqh_segment;
                                143                 :                :     BackgroundWorkerHandle *mqh_handle;
                                144                 :                :     char       *mqh_buffer;
                                145                 :                :     Size        mqh_buflen;
                                146                 :                :     Size        mqh_consume_pending;
                                147                 :                :     Size        mqh_send_pending;
                                148                 :                :     Size        mqh_partial_bytes;
                                149                 :                :     Size        mqh_expected_bytes;
                                150                 :                :     bool        mqh_length_word_complete;
                                151                 :                :     bool        mqh_counterparty_attached;
                                152                 :                :     MemoryContext mqh_context;
                                153                 :                : };
                                154                 :                : 
                                155                 :                : static void shm_mq_detach_internal(shm_mq *mq);
                                156                 :                : static shm_mq_result shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes,
                                157                 :                :                                        const void *data, bool nowait, Size *bytes_written);
                                158                 :                : static shm_mq_result shm_mq_receive_bytes(shm_mq_handle *mqh,
                                159                 :                :                                           Size bytes_needed, bool nowait, Size *nbytesp,
                                160                 :                :                                           void **datap);
                                161                 :                : static bool shm_mq_counterparty_gone(shm_mq *mq,
                                162                 :                :                                      BackgroundWorkerHandle *handle);
                                163                 :                : static bool shm_mq_wait_internal(shm_mq *mq, PGPROC **ptr,
                                164                 :                :                                  BackgroundWorkerHandle *handle);
                                165                 :                : static void shm_mq_inc_bytes_read(shm_mq *mq, Size n);
                                166                 :                : static void shm_mq_inc_bytes_written(shm_mq *mq, Size n);
                                167                 :                : static void shm_mq_detach_callback(dsm_segment *seg, Datum arg);
                                168                 :                : 
                                169                 :                : /* Minimum queue size is enough for header and at least one chunk of data. */
                                170                 :                : const Size  shm_mq_minimum_size =
                                171                 :                : MAXALIGN(offsetof(shm_mq, mq_ring)) + MAXIMUM_ALIGNOF;
                                172                 :                : 
                                173                 :                : #define MQH_INITIAL_BUFSIZE             8192
                                174                 :                : 
                                175                 :                : /*
                                176                 :                :  * Initialize a new shared message queue.
                                177                 :                :  */
                                178                 :                : shm_mq *
 4443 rhaas@postgresql.org      179                 :CBC        2956 : shm_mq_create(void *address, Size size)
                                180                 :                : {
                                181                 :           2956 :     shm_mq     *mq = address;
 4380                           182                 :           2956 :     Size        data_offset = MAXALIGN(offsetof(shm_mq, mq_ring));
                                183                 :                : 
                                184                 :                :     /* If the size isn't MAXALIGN'd, just discard the odd bytes. */
 4443                           185                 :           2956 :     size = MAXALIGN_DOWN(size);
                                186                 :                : 
                                187                 :                :     /* Queue size must be large enough to hold some data. */
                                188         [ -  + ]:           2956 :     Assert(size > data_offset);
                                189                 :                : 
                                190                 :                :     /* Initialize queue header. */
                                191                 :           2956 :     SpinLockInit(&mq->mq_mutex);
                                192                 :           2956 :     mq->mq_receiver = NULL;
                                193                 :           2956 :     mq->mq_sender = NULL;
 2935                           194                 :           2956 :     pg_atomic_init_u64(&mq->mq_bytes_read, 0);
                                195                 :           2956 :     pg_atomic_init_u64(&mq->mq_bytes_written, 0);
 4443                           196                 :           2956 :     mq->mq_ring_size = size - data_offset;
                                197                 :           2956 :     mq->mq_detached = false;
                                198                 :           2956 :     mq->mq_ring_offset = data_offset - offsetof(shm_mq, mq_ring);
                                199                 :                : 
                                200                 :           2956 :     return mq;
                                201                 :                : }
                                202                 :                : 
                                203                 :                : /*
                                204                 :                :  * Set the identity of the process that will receive from a shared message
                                205                 :                :  * queue.
                                206                 :                :  */
                                207                 :                : void
                                208                 :           2956 : shm_mq_set_receiver(shm_mq *mq, PGPROC *proc)
                                209                 :                : {
                                210                 :                :     PGPROC     *sender;
                                211                 :                : 
                                212         [ -  + ]:           2956 :     SpinLockAcquire(&mq->mq_mutex);
 2936 andres@anarazel.de        213         [ -  + ]:           2956 :     Assert(mq->mq_receiver == NULL);
                                214                 :           2956 :     mq->mq_receiver = proc;
                                215                 :           2956 :     sender = mq->mq_sender;
 4443 rhaas@postgresql.org      216                 :           2956 :     SpinLockRelease(&mq->mq_mutex);
                                217                 :                : 
                                218         [ +  + ]:           2956 :     if (sender != NULL)
                                219                 :             19 :         SetLatch(&sender->procLatch);
                                220                 :           2956 : }
                                221                 :                : 
                                222                 :                : /*
                                223                 :                :  * Set the identity of the process that will send to a shared message queue.
                                224                 :                :  */
                                225                 :                : void
                                226                 :           2872 : shm_mq_set_sender(shm_mq *mq, PGPROC *proc)
                                227                 :                : {
                                228                 :                :     PGPROC     *receiver;
                                229                 :                : 
                                230         [ +  + ]:           2872 :     SpinLockAcquire(&mq->mq_mutex);
 2936 andres@anarazel.de        231         [ -  + ]:           2872 :     Assert(mq->mq_sender == NULL);
                                232                 :           2872 :     mq->mq_sender = proc;
                                233                 :           2872 :     receiver = mq->mq_receiver;
 4443 rhaas@postgresql.org      234                 :           2872 :     SpinLockRelease(&mq->mq_mutex);
                                235                 :                : 
                                236         [ +  + ]:           2872 :     if (receiver != NULL)
                                237                 :           2853 :         SetLatch(&receiver->procLatch);
                                238                 :           2872 : }
                                239                 :                : 
                                240                 :                : /*
                                241                 :                :  * Get the configured receiver.
                                242                 :                :  */
                                243                 :                : PGPROC *
                                244                 :              4 : shm_mq_get_receiver(shm_mq *mq)
                                245                 :                : {
                                246                 :                :     PGPROC     *receiver;
                                247                 :                : 
                                248         [ -  + ]:              4 :     SpinLockAcquire(&mq->mq_mutex);
 2936 andres@anarazel.de        249                 :              4 :     receiver = mq->mq_receiver;
 4443 rhaas@postgresql.org      250                 :              4 :     SpinLockRelease(&mq->mq_mutex);
                                251                 :                : 
                                252                 :              4 :     return receiver;
                                253                 :                : }
                                254                 :                : 
                                255                 :                : /*
                                256                 :                :  * Get the configured sender.
                                257                 :                :  */
                                258                 :                : PGPROC *
                                259                 :        3326460 : shm_mq_get_sender(shm_mq *mq)
                                260                 :                : {
                                261                 :                :     PGPROC     *sender;
                                262                 :                : 
                                263         [ +  + ]:        3326460 :     SpinLockAcquire(&mq->mq_mutex);
 2936 andres@anarazel.de        264                 :        3326460 :     sender = mq->mq_sender;
 4443 rhaas@postgresql.org      265                 :        3326460 :     SpinLockRelease(&mq->mq_mutex);
                                266                 :                : 
                                267                 :        3326460 :     return sender;
                                268                 :                : }
                                269                 :                : 
                                270                 :                : /*
                                271                 :                :  * Attach to a shared message queue so we can send or receive messages.
                                272                 :                :  *
                                273                 :                :  * The memory context in effect at the time this function is called should
                                274                 :                :  * be one which will last for at least as long as the message queue itself.
                                275                 :                :  * We'll allocate the handle in that context, and future allocations that
                                276                 :                :  * are needed to buffer incoming data will happen in that context as well.
                                277                 :                :  *
                                278                 :                :  * If seg != NULL, the queue will be automatically detached when that dynamic
                                279                 :                :  * shared memory segment is detached.
                                280                 :                :  *
                                281                 :                :  * If handle != NULL, the queue can be read or written even before the
                                282                 :                :  * other process has attached.  We'll wait for it to do so if needed.  The
                                283                 :                :  * handle must be for a background worker initialized with bgw_notify_pid
                                284                 :                :  * equal to our PID.
                                285                 :                :  *
                                286                 :                :  * shm_mq_detach() should be called when done.  This will free the
                                287                 :                :  * shm_mq_handle and mark the queue itself as detached, so that our
                                288                 :                :  * counterpart won't get stuck waiting for us to fill or drain the queue
                                289                 :                :  * after we've already lost interest.
                                290                 :                :  */
                                291                 :                : shm_mq_handle *
                                292                 :           5828 : shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
                                293                 :                : {
   95 michael@paquier.xyz       294                 :GNC        5828 :     shm_mq_handle *mqh = palloc_object(shm_mq_handle);
                                295                 :                : 
 4443 rhaas@postgresql.org      296   [ +  +  -  + ]:CBC        5828 :     Assert(mq->mq_receiver == MyProc || mq->mq_sender == MyProc);
                                297                 :           5828 :     mqh->mqh_queue = mq;
                                298                 :           5828 :     mqh->mqh_segment = seg;
                                299                 :           5828 :     mqh->mqh_handle = handle;
 3118 tgl@sss.pgh.pa.us         300                 :           5828 :     mqh->mqh_buffer = NULL;
 4443 rhaas@postgresql.org      301                 :           5828 :     mqh->mqh_buflen = 0;
                                302                 :           5828 :     mqh->mqh_consume_pending = 0;
 1613                           303                 :           5828 :     mqh->mqh_send_pending = 0;
 4380                           304                 :           5828 :     mqh->mqh_partial_bytes = 0;
 3118 tgl@sss.pgh.pa.us         305                 :           5828 :     mqh->mqh_expected_bytes = 0;
 4380 rhaas@postgresql.org      306                 :           5828 :     mqh->mqh_length_word_complete = false;
 4443                           307                 :           5828 :     mqh->mqh_counterparty_attached = false;
 3118 tgl@sss.pgh.pa.us         308                 :           5828 :     mqh->mqh_context = CurrentMemoryContext;
                                309                 :                : 
 4443 rhaas@postgresql.org      310         [ +  - ]:           5828 :     if (seg != NULL)
                                311                 :           5828 :         on_dsm_detach(seg, shm_mq_detach_callback, PointerGetDatum(mq));
                                312                 :                : 
                                313                 :           5828 :     return mqh;
                                314                 :                : }
                                315                 :                : 
                                316                 :                : /*
                                317                 :                :  * Associate a BackgroundWorkerHandle with a shm_mq_handle just as if it had
                                318                 :                :  * been passed to shm_mq_attach.
                                319                 :                :  */
                                320                 :                : void
 4176                           321                 :           2836 : shm_mq_set_handle(shm_mq_handle *mqh, BackgroundWorkerHandle *handle)
                                322                 :                : {
                                323         [ -  + ]:           2836 :     Assert(mqh->mqh_handle == NULL);
                                324                 :           2836 :     mqh->mqh_handle = handle;
                                325                 :           2836 : }
                                326                 :                : 
                                327                 :                : /*
                                328                 :                :  * Write a message into a shared message queue.
                                329                 :                :  */
                                330                 :                : shm_mq_result
 1613                           331                 :        1222256 : shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait,
                                332                 :                :             bool force_flush)
                                333                 :                : {
                                334                 :                :     shm_mq_iovec iov;
                                335                 :                : 
 4176                           336                 :        1222256 :     iov.data = data;
                                337                 :        1222256 :     iov.len = nbytes;
                                338                 :                : 
 1613                           339                 :        1222256 :     return shm_mq_sendv(mqh, &iov, 1, nowait, force_flush);
                                340                 :                : }
                                341                 :                : 
                                342                 :                : /*
                                343                 :                :  * Write a message into a shared message queue, gathered from multiple
                                344                 :                :  * addresses.
                                345                 :                :  *
                                346                 :                :  * When nowait = false, we'll wait on our process latch when the ring buffer
                                347                 :                :  * fills up, and then continue writing once the receiver has drained some data.
                                348                 :                :  * The process latch is reset after each wait.
                                349                 :                :  *
                                350                 :                :  * When nowait = true, we do not manipulate the state of the process latch;
                                351                 :                :  * instead, if the buffer becomes full, we return SHM_MQ_WOULD_BLOCK.  In
                                352                 :                :  * this case, the caller should call this function again, with the same
                                353                 :                :  * arguments, each time the process latch is set.  (Once begun, the sending
                                354                 :                :  * of a message cannot be aborted except by detaching from the queue; changing
                                355                 :                :  * the length or payload will corrupt the queue.)
                                356                 :                :  *
                                357                 :                :  * When force_flush = true, we immediately update the shm_mq's mq_bytes_written
                                358                 :                :  * and notify the receiver (if it is already attached).  Otherwise, we don't
                                359                 :                :  * update it until we have written an amount of data greater than 1/4th of the
                                360                 :                :  * ring size.
                                361                 :                :  */
                                362                 :                : shm_mq_result
                                363                 :        1223758 : shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait,
                                364                 :                :              bool force_flush)
                                365                 :                : {
                                366                 :                :     shm_mq_result res;
 4331 bruce@momjian.us          367                 :        1223758 :     shm_mq     *mq = mqh->mqh_queue;
                                368                 :                :     PGPROC     *receiver;
 4176 rhaas@postgresql.org      369                 :        1223758 :     Size        nbytes = 0;
                                370                 :                :     Size        bytes_written;
                                371                 :                :     int         i;
                                372                 :        1223758 :     int         which_iov = 0;
                                373                 :                :     Size        offset;
                                374                 :                : 
 4443                           375         [ -  + ]:        1223758 :     Assert(mq->mq_sender == MyProc);
                                376                 :                : 
                                377                 :                :     /* Compute total size of write. */
 4176                           378         [ +  + ]:        2449018 :     for (i = 0; i < iovcnt; ++i)
                                379                 :        1225260 :         nbytes += iov[i].len;
                                380                 :                : 
                                381                 :                :     /* Prevent writing messages overwhelming the receiver. */
 1973 peter@eisentraut.org      382         [ -  + ]:        1223758 :     if (nbytes > MaxAllocSize)
 1973 peter@eisentraut.org      383         [ #  # ]:UBC           0 :         ereport(ERROR,
                                384                 :                :                 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
                                385                 :                :                  errmsg("cannot send a message of size %zu via shared memory queue",
                                386                 :                :                         nbytes)));
                                387                 :                : 
                                388                 :                :     /* Try to write, or finish writing, the length word into the buffer. */
 4380 rhaas@postgresql.org      389         [ +  + ]:CBC     2443604 :     while (!mqh->mqh_length_word_complete)
                                390                 :                :     {
                                391         [ -  + ]:        1219850 :         Assert(mqh->mqh_partial_bytes < sizeof(Size));
                                392                 :        1219850 :         res = shm_mq_send_bytes(mqh, sizeof(Size) - mqh->mqh_partial_bytes,
 3189 tgl@sss.pgh.pa.us         393                 :        1219850 :                                 ((char *) &nbytes) + mqh->mqh_partial_bytes,
                                394                 :                :                                 nowait, &bytes_written);
                                395                 :                : 
 3569 rhaas@postgresql.org      396         [ +  + ]:        1219850 :         if (res == SHM_MQ_DETACHED)
                                397                 :                :         {
                                398                 :                :             /* Reset state in case caller tries to send another message. */
                                399                 :              4 :             mqh->mqh_partial_bytes = 0;
                                400                 :              4 :             mqh->mqh_length_word_complete = false;
 4443                           401                 :              4 :             return res;
                                402                 :                :         }
 3569                           403                 :        1219846 :         mqh->mqh_partial_bytes += bytes_written;
                                404                 :                : 
 4380                           405         [ +  - ]:        1219846 :         if (mqh->mqh_partial_bytes >= sizeof(Size))
                                406                 :                :         {
                                407         [ -  + ]:        1219846 :             Assert(mqh->mqh_partial_bytes == sizeof(Size));
                                408                 :                : 
                                409                 :        1219846 :             mqh->mqh_partial_bytes = 0;
                                410                 :        1219846 :             mqh->mqh_length_word_complete = true;
                                411                 :                :         }
                                412                 :                : 
 3569                           413         [ -  + ]:        1219846 :         if (res != SHM_MQ_SUCCESS)
 3569 rhaas@postgresql.org      414                 :UBC           0 :             return res;
                                415                 :                : 
                                416                 :                :         /* Length word can't be split unless bigger than required alignment. */
 4380 rhaas@postgresql.org      417         [ -  + ]:CBC     1219846 :         Assert(mqh->mqh_length_word_complete || sizeof(Size) > MAXIMUM_ALIGNOF);
                                418                 :                :     }
                                419                 :                : 
                                420                 :                :     /* Write the actual data bytes into the buffer. */
                                421         [ -  + ]:        1223754 :     Assert(mqh->mqh_partial_bytes <= nbytes);
 4176                           422                 :        1223754 :     offset = mqh->mqh_partial_bytes;
                                423                 :                :     do
                                424                 :                :     {
                                425                 :                :         Size        chunksize;
                                426                 :                : 
                                427                 :                :         /* Figure out which bytes need to be sent next. */
                                428         [ +  + ]:        1223767 :         if (offset >= iov[which_iov].len)
                                429                 :                :         {
                                430                 :           4004 :             offset -= iov[which_iov].len;
                                431                 :           4004 :             ++which_iov;
                                432         [ +  + ]:           4004 :             if (which_iov >= iovcnt)
                                433                 :           4000 :                 break;
                                434                 :              4 :             continue;
                                435                 :                :         }
                                436                 :                : 
                                437                 :                :         /*
                                438                 :                :          * We want to avoid copying the data if at all possible, but every
                                439                 :                :          * chunk of bytes we write into the queue has to be MAXALIGN'd, except
                                440                 :                :          * the last.  Thus, if a chunk other than the last one ends on a
                                441                 :                :          * non-MAXALIGN'd boundary, we have to combine the tail end of its
                                442                 :                :          * data with data from one or more following chunks until we either
                                443                 :                :          * reach the last chunk or accumulate a number of bytes which is
                                444                 :                :          * MAXALIGN'd.
                                445                 :                :          */
                                446         [ +  + ]:        1219763 :         if (which_iov + 1 < iovcnt &&
                                447         [ +  - ]:           1494 :             offset + MAXIMUM_ALIGNOF > iov[which_iov].len)
                                448                 :           1494 :         {
                                449                 :                :             char        tmpbuf[MAXIMUM_ALIGNOF];
 3949 bruce@momjian.us          450                 :           1494 :             int         j = 0;
                                451                 :                : 
                                452                 :                :             for (;;)
                                453                 :                :             {
 4176 rhaas@postgresql.org      454         [ +  + ]:           4536 :                 if (offset < iov[which_iov].len)
                                455                 :                :                 {
                                456                 :           1557 :                     tmpbuf[j] = iov[which_iov].data[offset];
                                457                 :           1557 :                     j++;
                                458                 :           1557 :                     offset++;
                                459         [ +  + ]:           1557 :                     if (j == MAXIMUM_ALIGNOF)
                                460                 :              9 :                         break;
                                461                 :                :                 }
                                462                 :                :                 else
                                463                 :                :                 {
                                464                 :           2979 :                     offset -= iov[which_iov].len;
                                465                 :           2979 :                     which_iov++;
                                466         [ +  + ]:           2979 :                     if (which_iov >= iovcnt)
                                467                 :           1485 :                         break;
                                468                 :                :                 }
                                469                 :                :             }
                                470                 :                : 
                                471                 :           1494 :             res = shm_mq_send_bytes(mqh, j, tmpbuf, nowait, &bytes_written);
                                472                 :                : 
 3569                           473         [ -  + ]:           1494 :             if (res == SHM_MQ_DETACHED)
                                474                 :                :             {
                                475                 :                :                 /* Reset state in case caller tries to send another message. */
 3569 rhaas@postgresql.org      476                 :UBC           0 :                 mqh->mqh_partial_bytes = 0;
                                477                 :              0 :                 mqh->mqh_length_word_complete = false;
                                478                 :              0 :                 return res;
                                479                 :                :             }
                                480                 :                : 
 4176 rhaas@postgresql.org      481                 :CBC        1494 :             mqh->mqh_partial_bytes += bytes_written;
                                482         [ -  + ]:           1494 :             if (res != SHM_MQ_SUCCESS)
 4176 rhaas@postgresql.org      483                 :UBC           0 :                 return res;
 4176 rhaas@postgresql.org      484                 :CBC        1494 :             continue;
                                485                 :                :         }
                                486                 :                : 
                                487                 :                :         /*
                                488                 :                :          * If this is the last chunk, we can write all the data, even if it
                                489                 :                :          * isn't a multiple of MAXIMUM_ALIGNOF.  Otherwise, we need to
                                490                 :                :          * MAXALIGN_DOWN the write size.
                                491                 :                :          */
                                492                 :        1218269 :         chunksize = iov[which_iov].len - offset;
                                493         [ -  + ]:        1218269 :         if (which_iov + 1 < iovcnt)
 4176 rhaas@postgresql.org      494                 :UBC           0 :             chunksize = MAXALIGN_DOWN(chunksize);
 4176 rhaas@postgresql.org      495                 :CBC     1218269 :         res = shm_mq_send_bytes(mqh, chunksize, &iov[which_iov].data[offset],
                                496                 :                :                                 nowait, &bytes_written);
                                497                 :                : 
 3569                           498         [ -  + ]:        1218269 :         if (res == SHM_MQ_DETACHED)
                                499                 :                :         {
                                500                 :                :             /* Reset state in case caller tries to send another message. */
 3569 rhaas@postgresql.org      501                 :UBC           0 :             mqh->mqh_length_word_complete = false;
                                502                 :              0 :             mqh->mqh_partial_bytes = 0;
                                503                 :              0 :             return res;
                                504                 :                :         }
                                505                 :                : 
 4176 rhaas@postgresql.org      506                 :CBC     1218269 :         mqh->mqh_partial_bytes += bytes_written;
                                507                 :        1218269 :         offset += bytes_written;
                                508         [ +  + ]:        1218269 :         if (res != SHM_MQ_SUCCESS)
                                509                 :           3908 :             return res;
                                510         [ +  + ]:        1215859 :     } while (mqh->mqh_partial_bytes < nbytes);
                                511                 :                : 
                                512                 :                :     /* Reset for next message. */
                                513                 :        1219846 :     mqh->mqh_partial_bytes = 0;
                                514                 :        1219846 :     mqh->mqh_length_word_complete = false;
                                515                 :                : 
                                516                 :                :     /* If queue has been detached, let caller know. */
 2935                           517         [ -  + ]:        1219846 :     if (mq->mq_detached)
 2935 rhaas@postgresql.org      518                 :UBC           0 :         return SHM_MQ_DETACHED;
                                519                 :                : 
                                520                 :                :     /*
                                521                 :                :      * If the counterparty is known to have attached, we can read mq_receiver
                                522                 :                :      * without acquiring the spinlock.  Otherwise, more caution is needed.
                                523                 :                :      */
 2935 rhaas@postgresql.org      524         [ +  + ]:CBC     1219846 :     if (mqh->mqh_counterparty_attached)
                                525                 :        1217241 :         receiver = mq->mq_receiver;
                                526                 :                :     else
                                527                 :                :     {
                                528         [ -  + ]:           2605 :         SpinLockAcquire(&mq->mq_mutex);
                                529                 :           2605 :         receiver = mq->mq_receiver;
                                530                 :           2605 :         SpinLockRelease(&mq->mq_mutex);
 1391                           531         [ +  - ]:           2605 :         if (receiver != NULL)
                                532                 :           2605 :             mqh->mqh_counterparty_attached = true;
                                533                 :                :     }
                                534                 :                : 
                                535                 :                :     /*
                                536                 :                :      * If the caller has requested force flush or we have written more than
                                537                 :                :      * 1/4 of the ring size, mark it as written in shared memory and notify
                                538                 :                :      * the receiver.
                                539                 :                :      */
 1613                           540   [ +  +  +  + ]:        1219846 :     if (force_flush || mqh->mqh_send_pending > (mq->mq_ring_size >> 2))
                                541                 :                :     {
                                542                 :         121112 :         shm_mq_inc_bytes_written(mq, mqh->mqh_send_pending);
 1391                           543         [ +  - ]:         121112 :         if (receiver != NULL)
                                544                 :         121112 :             SetLatch(&receiver->procLatch);
 1613                           545                 :         121112 :         mqh->mqh_send_pending = 0;
                                546                 :                :     }
                                547                 :                : 
 2935                           548                 :        1219846 :     return SHM_MQ_SUCCESS;
                                549                 :                : }
                                550                 :                : 
                                551                 :                : /*
                                552                 :                :  * Receive a message from a shared message queue.
                                553                 :                :  *
                                554                 :                :  * We set *nbytes to the message length and *data to point to the message
                                555                 :                :  * payload.  If the entire message exists in the queue as a single,
                                556                 :                :  * contiguous chunk, *data will point directly into shared memory; otherwise,
                                557                 :                :  * it will point to a temporary buffer.  This mostly avoids data copying in
                                558                 :                :  * the hoped-for case where messages are short compared to the buffer size,
                                559                 :                :  * while still allowing longer messages.  In either case, the return value
                                560                 :                :  * remains valid until the next receive operation is performed on the queue.
                                561                 :                :  *
                                562                 :                :  * When nowait = false, we'll wait on our process latch when the ring buffer
                                563                 :                :  * is empty and we have not yet received a full message.  The sender will
                                564                 :                :  * set our process latch after more data has been written, and we'll resume
                                565                 :                :  * processing.  Each call will therefore return a complete message
                                566                 :                :  * (unless the sender detaches the queue).
                                567                 :                :  *
                                568                 :                :  * When nowait = true, we do not manipulate the state of the process latch;
                                569                 :                :  * instead, whenever the buffer is empty and we need to read from it, we
                                570                 :                :  * return SHM_MQ_WOULD_BLOCK.  In this case, the caller should call this
                                571                 :                :  * function again after the process latch has been set.
                                572                 :                :  */
                                573                 :                : shm_mq_result
 4380                           574                 :        2691174 : shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
                                575                 :                : {
 4331 bruce@momjian.us          576                 :        2691174 :     shm_mq     *mq = mqh->mqh_queue;
                                577                 :                :     shm_mq_result res;
                                578                 :        2691174 :     Size        rb = 0;
                                579                 :                :     Size        nbytes;
                                580                 :                :     void       *rawdata;
                                581                 :                : 
 4443 rhaas@postgresql.org      582         [ -  + ]:        2691174 :     Assert(mq->mq_receiver == MyProc);
                                583                 :                : 
                                584                 :                :     /* We can't receive data until the sender has attached. */
                                585         [ +  + ]:        2691174 :     if (!mqh->mqh_counterparty_attached)
                                586                 :                :     {
                                587         [ +  + ]:        1153753 :         if (nowait)
                                588                 :                :         {
                                589                 :                :             int         counterparty_gone;
                                590                 :                : 
                                591                 :                :             /*
                                592                 :                :              * We shouldn't return at this point at all unless the sender
                                593                 :                :              * hasn't attached yet.  However, the correct return value depends
                                594                 :                :              * on whether the sender is still attached.  If we first test
                                595                 :                :              * whether the sender has ever attached and then test whether the
                                596                 :                :              * sender has detached, there's a race condition: a sender that
                                597                 :                :              * attaches and detaches very quickly might fool us into thinking
                                598                 :                :              * the sender never attached at all.  So, test whether our
                                599                 :                :              * counterparty is definitively gone first, and only afterwards
                                600                 :                :              * check whether the sender ever attached in the first place.
                                601                 :                :              */
 3785                           602                 :        1153689 :             counterparty_gone = shm_mq_counterparty_gone(mq, mqh->mqh_handle);
 4443                           603         [ +  + ]:        1153689 :             if (shm_mq_get_sender(mq) == NULL)
                                604                 :                :             {
 3785                           605         [ -  + ]:        1150889 :                 if (counterparty_gone)
 3797 rhaas@postgresql.org      606                 :UBC           0 :                     return SHM_MQ_DETACHED;
                                607                 :                :                 else
 3785 rhaas@postgresql.org      608                 :CBC     1150889 :                     return SHM_MQ_WOULD_BLOCK;
                                609                 :                :             }
                                610                 :                :         }
 4337                           611         [ +  + ]:             64 :         else if (!shm_mq_wait_internal(mq, &mq->mq_sender, mqh->mqh_handle)
                                612         [ -  + ]:             24 :                  && shm_mq_get_sender(mq) == NULL)
                                613                 :                :         {
 4443 rhaas@postgresql.org      614                 :UBC           0 :             mq->mq_detached = true;
                                615                 :              0 :             return SHM_MQ_DETACHED;
                                616                 :                :         }
 4443 rhaas@postgresql.org      617                 :CBC        2864 :         mqh->mqh_counterparty_attached = true;
                                618                 :                :     }
                                619                 :                : 
                                620                 :                :     /*
                                621                 :                :      * If we've consumed an amount of data greater than 1/4th of the ring
                                622                 :                :      * size, mark it consumed in shared memory.  We try to avoid doing this
                                623                 :                :      * unnecessarily when only a small amount of data has been consumed,
                                624                 :                :      * because SetLatch() is fairly expensive and we don't want to do it too
                                625                 :                :      * often.
                                626                 :                :      */
 2935                           627         [ +  + ]:        1540285 :     if (mqh->mqh_consume_pending > mq->mq_ring_size / 4)
                                628                 :                :     {
 4443                           629                 :          19130 :         shm_mq_inc_bytes_read(mq, mqh->mqh_consume_pending);
                                630                 :          19130 :         mqh->mqh_consume_pending = 0;
                                631                 :                :     }
                                632                 :                : 
                                633                 :                :     /* Try to read, or finish reading, the length word from the buffer. */
 4380                           634         [ +  + ]:        1562081 :     while (!mqh->mqh_length_word_complete)
                                635                 :                :     {
                                636                 :                :         /* Try to receive the message length word. */
                                637         [ -  + ]:        1534152 :         Assert(mqh->mqh_partial_bytes < sizeof(Size));
 2935                           638                 :        1534152 :         res = shm_mq_receive_bytes(mqh, sizeof(Size) - mqh->mqh_partial_bytes,
                                639                 :                :                                    nowait, &rb, &rawdata);
 4443                           640         [ +  + ]:        1534152 :         if (res != SHM_MQ_SUCCESS)
                                641                 :         320777 :             return res;
                                642                 :                : 
                                643                 :                :         /*
                                644                 :                :          * Hopefully, we'll receive the entire message length word at once.
                                645                 :                :          * But if sizeof(Size) > MAXIMUM_ALIGNOF, then it might be split over
                                646                 :                :          * multiple reads.
                                647                 :                :          */
 4380                           648   [ +  -  +  - ]:        1213375 :         if (mqh->mqh_partial_bytes == 0 && rb >= sizeof(Size))
 4443                           649                 :          21796 :         {
                                650                 :                :             Size        needed;
                                651                 :                : 
 4331 bruce@momjian.us          652                 :        1213375 :             nbytes = *(Size *) rawdata;
                                653                 :                : 
                                654                 :                :             /* If we've already got the whole message, we're done. */
 4380 rhaas@postgresql.org      655                 :        1213375 :             needed = MAXALIGN(sizeof(Size)) + MAXALIGN(nbytes);
                                656         [ +  + ]:        1213375 :             if (rb >= needed)
                                657                 :                :             {
 2935                           658                 :        1191579 :                 mqh->mqh_consume_pending += needed;
 4380                           659                 :        1191579 :                 *nbytesp = nbytes;
                                660                 :        1191579 :                 *datap = ((char *) rawdata) + MAXALIGN(sizeof(Size));
                                661                 :        1191579 :                 return SHM_MQ_SUCCESS;
                                662                 :                :             }
                                663                 :                : 
                                664                 :                :             /*
                                665                 :                :              * We don't have the whole message, but we at least have the whole
                                666                 :                :              * length word.
                                667                 :                :              */
                                668                 :          21796 :             mqh->mqh_expected_bytes = nbytes;
                                669                 :          21796 :             mqh->mqh_length_word_complete = true;
 2935                           670                 :          21796 :             mqh->mqh_consume_pending += MAXALIGN(sizeof(Size));
 4380                           671                 :          21796 :             rb -= MAXALIGN(sizeof(Size));
                                672                 :                :         }
                                673                 :                :         else
                                674                 :                :         {
                                675                 :                :             Size        lengthbytes;
                                676                 :                : 
                                677                 :                :             /* Can't be split unless bigger than required alignment. */
 4380 rhaas@postgresql.org      678                 :UBC           0 :             Assert(sizeof(Size) > MAXIMUM_ALIGNOF);
                                679                 :                : 
                                680                 :                :             /* Message word is split; need buffer to reassemble. */
                                681                 :                :             if (mqh->mqh_buffer == NULL)
                                682                 :                :             {
                                683                 :                :                 mqh->mqh_buffer = MemoryContextAlloc(mqh->mqh_context,
                                684                 :                :                                                      MQH_INITIAL_BUFSIZE);
                                685                 :                :                 mqh->mqh_buflen = MQH_INITIAL_BUFSIZE;
                                686                 :                :             }
                                687                 :                :             Assert(mqh->mqh_buflen >= sizeof(Size));
                                688                 :                : 
                                689                 :                :             /* Copy partial length word; remember to consume it. */
                                690                 :                :             if (mqh->mqh_partial_bytes + rb > sizeof(Size))
                                691                 :                :                 lengthbytes = sizeof(Size) - mqh->mqh_partial_bytes;
                                692                 :                :             else
                                693                 :                :                 lengthbytes = rb;
                                694                 :                :             memcpy(&mqh->mqh_buffer[mqh->mqh_partial_bytes], rawdata,
                                695                 :                :                    lengthbytes);
                                696                 :                :             mqh->mqh_partial_bytes += lengthbytes;
                                697                 :                :             mqh->mqh_consume_pending += MAXALIGN(lengthbytes);
                                698                 :                :             rb -= lengthbytes;
                                699                 :                : 
                                700                 :                :             /* If we now have the whole word, we're ready to read payload. */
                                701                 :                :             if (mqh->mqh_partial_bytes >= sizeof(Size))
                                702                 :                :             {
                                703                 :                :                 Assert(mqh->mqh_partial_bytes == sizeof(Size));
                                704                 :                :                 mqh->mqh_expected_bytes = *(Size *) mqh->mqh_buffer;
                                705                 :                :                 mqh->mqh_length_word_complete = true;
                                706                 :                :                 mqh->mqh_partial_bytes = 0;
                                707                 :                :             }
                                708                 :                :         }
                                709                 :                :     }
 4380 rhaas@postgresql.org      710                 :CBC       27929 :     nbytes = mqh->mqh_expected_bytes;
                                711                 :                : 
                                712                 :                :     /*
                                713                 :                :      * Should be disallowed on the sending side already, but better check and
                                714                 :                :      * error out on the receiver side as well rather than trying to read a
                                715                 :                :      * prohibitively large message.
                                716                 :                :      */
 1973 peter@eisentraut.org      717         [ -  + ]:          27929 :     if (nbytes > MaxAllocSize)
 1973 peter@eisentraut.org      718         [ #  # ]:UBC           0 :         ereport(ERROR,
                                719                 :                :                 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
                                720                 :                :                  errmsg("invalid message size %zu in shared memory queue",
                                721                 :                :                         nbytes)));
                                722                 :                : 
 4380 rhaas@postgresql.org      723         [ +  + ]:CBC       27929 :     if (mqh->mqh_partial_bytes == 0)
                                724                 :                :     {
                                725                 :                :         /*
                                726                 :                :          * Try to obtain the whole message in a single chunk.  If this works,
                                727                 :                :          * we need not copy the data and can return a pointer directly into
                                728                 :                :          * shared memory.
                                729                 :                :          */
 2935                           730                 :          24212 :         res = shm_mq_receive_bytes(mqh, nbytes, nowait, &rb, &rawdata);
 4443                           731         [ +  + ]:          24212 :         if (res != SHM_MQ_SUCCESS)
                                732                 :           2416 :             return res;
                                733         [ +  + ]:          21796 :         if (rb >= nbytes)
                                734                 :                :         {
 4380                           735                 :            228 :             mqh->mqh_length_word_complete = false;
 2935                           736                 :            228 :             mqh->mqh_consume_pending += MAXALIGN(nbytes);
 4443                           737                 :            228 :             *nbytesp = nbytes;
                                738                 :            228 :             *datap = rawdata;
                                739                 :            228 :             return SHM_MQ_SUCCESS;
                                740                 :                :         }
                                741                 :                : 
                                742                 :                :         /*
                                743                 :                :          * The message has wrapped the buffer.  We'll need to copy it in order
                                744                 :                :          * to return it to the client in one chunk.  First, make sure we have
                                745                 :                :          * a large enough buffer available.
                                746                 :                :          */
                                747         [ +  + ]:          21568 :         if (mqh->mqh_buflen < nbytes)
                                748                 :                :         {
                                749                 :                :             Size        newbuflen;
                                750                 :                : 
                                751                 :                :             /*
                                752                 :                :              * Increase size to the next power of 2 that's >= nbytes, but
                                753                 :                :              * limit to MaxAllocSize.
                                754                 :                :              */
 1694 tgl@sss.pgh.pa.us         755                 :            143 :             newbuflen = pg_nextpower2_size_t(nbytes);
 1973 peter@eisentraut.org      756                 :            143 :             newbuflen = Min(newbuflen, MaxAllocSize);
                                757                 :                : 
 4443 rhaas@postgresql.org      758         [ -  + ]:            143 :             if (mqh->mqh_buffer != NULL)
                                759                 :                :             {
 4443 rhaas@postgresql.org      760                 :UBC           0 :                 pfree(mqh->mqh_buffer);
                                761                 :              0 :                 mqh->mqh_buffer = NULL;
                                762                 :              0 :                 mqh->mqh_buflen = 0;
                                763                 :                :             }
 4443 rhaas@postgresql.org      764                 :CBC         143 :             mqh->mqh_buffer = MemoryContextAlloc(mqh->mqh_context, newbuflen);
                                765                 :            143 :             mqh->mqh_buflen = newbuflen;
                                766                 :                :         }
                                767                 :                :     }
                                768                 :                : 
                                769                 :                :     /* Loop until we've copied the entire message. */
                                770                 :                :     for (;;)
                                771                 :          73996 :     {
                                772                 :                :         Size        still_needed;
                                773                 :                : 
                                774                 :                :         /* Copy as much as we can. */
 4380                           775         [ -  + ]:          99281 :         Assert(mqh->mqh_partial_bytes + rb <= nbytes);
 1473 tgl@sss.pgh.pa.us         776         [ +  + ]:          99281 :         if (rb > 0)
                                777                 :                :         {
                                778                 :          95564 :             memcpy(&mqh->mqh_buffer[mqh->mqh_partial_bytes], rawdata, rb);
                                779                 :          95564 :             mqh->mqh_partial_bytes += rb;
                                780                 :                :         }
                                781                 :                : 
                                782                 :                :         /*
                                783                 :                :          * Update count of bytes that can be consumed, accounting for
                                784                 :                :          * alignment padding.  Note that this will never actually insert any
                                785                 :                :          * padding except at the end of a message, because the buffer size is
                                786                 :                :          * a multiple of MAXIMUM_ALIGNOF, and each read and write is as well.
                                787                 :                :          */
 4380 rhaas@postgresql.org      788   [ +  +  -  + ]:          99281 :         Assert(mqh->mqh_partial_bytes == nbytes || rb == MAXALIGN(rb));
 2935                           789                 :          99281 :         mqh->mqh_consume_pending += MAXALIGN(rb);
                                790                 :                : 
                                791                 :                :         /* If we got all the data, exit the loop. */
 4380                           792         [ +  + ]:          99281 :         if (mqh->mqh_partial_bytes >= nbytes)
 4443                           793                 :          21568 :             break;
                                794                 :                : 
                                795                 :                :         /* Wait for some more data. */
 4380                           796                 :          77713 :         still_needed = nbytes - mqh->mqh_partial_bytes;
 2935                           797                 :          77713 :         res = shm_mq_receive_bytes(mqh, still_needed, nowait, &rb, &rawdata);
 4443                           798         [ +  + ]:          77713 :         if (res != SHM_MQ_SUCCESS)
                                799                 :           3717 :             return res;
                                800         [ +  + ]:          73996 :         if (rb > still_needed)
                                801                 :          20889 :             rb = still_needed;
                                802                 :                :     }
                                803                 :                : 
                                804                 :                :     /* Return the complete message, and reset for next message. */
                                805                 :          21568 :     *nbytesp = nbytes;
                                806                 :          21568 :     *datap = mqh->mqh_buffer;
 4380                           807                 :          21568 :     mqh->mqh_length_word_complete = false;
                                808                 :          21568 :     mqh->mqh_partial_bytes = 0;
 4443                           809                 :          21568 :     return SHM_MQ_SUCCESS;
                                810                 :                : }
                                811                 :                : 
                                812                 :                : /*
                                813                 :                :  * Wait for the other process that's supposed to use this queue to attach
                                814                 :                :  * to it.
                                815                 :                :  *
                                816                 :                :  * The return value is SHM_MQ_DETACHED if the worker has already detached or
                                817                 :                :  * if it dies; it is SHM_MQ_SUCCESS if we detect that the worker has attached.
                                818                 :                :  * Note that we will only be able to detect that the worker has died before
                                819                 :                :  * attaching if a background worker handle was passed to shm_mq_attach().
                                820                 :                :  */
                                821                 :                : shm_mq_result
 4443 rhaas@postgresql.org      822                 :UBC           0 : shm_mq_wait_for_attach(shm_mq_handle *mqh)
                                823                 :                : {
                                824                 :              0 :     shm_mq     *mq = mqh->mqh_queue;
                                825                 :                :     PGPROC    **victim;
                                826                 :                : 
                                827         [ #  # ]:              0 :     if (shm_mq_get_receiver(mq) == MyProc)
                                828                 :              0 :         victim = &mq->mq_sender;
                                829                 :                :     else
                                830                 :                :     {
                                831         [ #  # ]:              0 :         Assert(shm_mq_get_sender(mq) == MyProc);
                                832                 :              0 :         victim = &mq->mq_receiver;
                                833                 :                :     }
                                834                 :                : 
                                835         [ #  # ]:              0 :     if (shm_mq_wait_internal(mq, victim, mqh->mqh_handle))
                                836                 :              0 :         return SHM_MQ_SUCCESS;
                                837                 :                :     else
                                838                 :              0 :         return SHM_MQ_DETACHED;
                                839                 :                : }
                                840                 :                : 
                                841                 :                : /*
                                842                 :                :  * Detach from a shared message queue, and destroy the shm_mq_handle.
                                843                 :                :  */
                                844                 :                : void
 3118 tgl@sss.pgh.pa.us         845                 :CBC        4229 : shm_mq_detach(shm_mq_handle *mqh)
                                846                 :                : {
                                847                 :                :     /* Before detaching, notify the receiver about any already-written data. */
 1613 rhaas@postgresql.org      848         [ +  + ]:           4229 :     if (mqh->mqh_send_pending > 0)
                                849                 :                :     {
                                850                 :           1096 :         shm_mq_inc_bytes_written(mqh->mqh_queue, mqh->mqh_send_pending);
                                851                 :           1096 :         mqh->mqh_send_pending = 0;
                                852                 :                :     }
                                853                 :                : 
                                854                 :                :     /* Notify counterparty that we're outta here. */
 3118 tgl@sss.pgh.pa.us         855                 :           4229 :     shm_mq_detach_internal(mqh->mqh_queue);
                                856                 :                : 
                                857                 :                :     /* Cancel on_dsm_detach callback, if any. */
                                858         [ +  - ]:           4229 :     if (mqh->mqh_segment)
                                859                 :           4229 :         cancel_on_dsm_detach(mqh->mqh_segment,
                                860                 :                :                              shm_mq_detach_callback,
                                861                 :           4229 :                              PointerGetDatum(mqh->mqh_queue));
                                862                 :                : 
                                863                 :                :     /* Release local memory associated with handle. */
                                864         [ +  + ]:           4229 :     if (mqh->mqh_buffer != NULL)
                                865                 :            135 :         pfree(mqh->mqh_buffer);
                                866                 :           4229 :     pfree(mqh);
                                867                 :           4229 : }
                                868                 :                : 
                                869                 :                : /*
                                870                 :                :  * Notify counterparty that we're detaching from shared message queue.
                                871                 :                :  *
                                872                 :                :  * The purpose of this function is to make sure that the process
                                873                 :                :  * with which we're communicating doesn't block forever waiting for us to
                                874                 :                :  * fill or drain the queue once we've lost interest.  When the sender
                                875                 :                :  * detaches, the receiver can read any messages remaining in the queue;
                                876                 :                :  * further reads will return SHM_MQ_DETACHED.  If the receiver detaches,
                                877                 :                :  * further attempts to send messages will likewise return SHM_MQ_DETACHED.
                                878                 :                :  *
                                879                 :                :  * This is separated out from shm_mq_detach() because if the on_dsm_detach
                                880                 :                :  * callback fires, we only want to do this much.  We do not try to touch
                                881                 :                :  * the local shm_mq_handle, as it may have been pfree'd already.
                                882                 :                :  */
                                883                 :                : static void
                                884                 :           5828 : shm_mq_detach_internal(shm_mq *mq)
                                885                 :                : {
                                886                 :                :     PGPROC     *victim;
                                887                 :                : 
 4443 rhaas@postgresql.org      888         [ -  + ]:           5828 :     SpinLockAcquire(&mq->mq_mutex);
 2936 andres@anarazel.de        889         [ +  + ]:           5828 :     if (mq->mq_sender == MyProc)
                                890                 :           2872 :         victim = mq->mq_receiver;
                                891                 :                :     else
                                892                 :                :     {
                                893         [ -  + ]:           2956 :         Assert(mq->mq_receiver == MyProc);
                                894                 :           2956 :         victim = mq->mq_sender;
                                895                 :                :     }
                                896                 :           5828 :     mq->mq_detached = true;
 4443 rhaas@postgresql.org      897                 :           5828 :     SpinLockRelease(&mq->mq_mutex);
                                898                 :                : 
                                899         [ +  + ]:           5828 :     if (victim != NULL)
                                900                 :           5747 :         SetLatch(&victim->procLatch);
                                901                 :           5828 : }
                                902                 :                : 
                                903                 :                : /*
                                904                 :                :  * Get the shm_mq from handle.
                                905                 :                :  */
                                906                 :                : shm_mq *
 3831                           907                 :        2172747 : shm_mq_get_queue(shm_mq_handle *mqh)
                                908                 :                : {
                                909                 :        2172747 :     return mqh->mqh_queue;
                                910                 :                : }
                                911                 :                : 
                                912                 :                : /*
                                913                 :                :  * Write bytes into a shared message queue.
                                914                 :                :  */
                                915                 :                : static shm_mq_result
 4176                           916                 :        2439613 : shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data,
                                917                 :                :                   bool nowait, Size *bytes_written)
                                918                 :                : {
 4443                           919                 :        2439613 :     shm_mq     *mq = mqh->mqh_queue;
 4380                           920                 :        2439613 :     Size        sent = 0;
                                921                 :                :     uint64      used;
                                922                 :        2439613 :     Size        ringsize = mq->mq_ring_size;
                                923                 :                :     Size        available;
                                924                 :                : 
 4443                           925         [ +  + ]:        5040364 :     while (sent < nbytes)
                                926                 :                :     {
                                927                 :                :         uint64      rb;
                                928                 :                :         uint64      wb;
                                929                 :                : 
                                930                 :                :         /* Compute number of ring buffer bytes used and available. */
 2935                           931                 :        2604663 :         rb = pg_atomic_read_u64(&mq->mq_bytes_read);
 1613                           932                 :        2604663 :         wb = pg_atomic_read_u64(&mq->mq_bytes_written) + mqh->mqh_send_pending;
 2935                           933         [ -  + ]:        2604663 :         Assert(wb >= rb);
                                934                 :        2604663 :         used = wb - rb;
 4443                           935         [ -  + ]:        2604663 :         Assert(used <= ringsize);
                                936                 :        2604663 :         available = Min(ringsize - used, nbytes - sent);
                                937                 :                : 
                                938                 :                :         /*
                                939                 :                :          * Bail out if the queue has been detached.  Note that we would be in
                                940                 :                :          * trouble if the compiler decided to cache the value of
                                941                 :                :          * mq->mq_detached in a register or on the stack across loop
                                942                 :                :          * iterations.  It probably shouldn't do that anyway since we'll
                                943                 :                :          * always return, call an external function that performs a system
                                944                 :                :          * call, or reach a memory barrier at some point later in the loop,
                                945                 :                :          * but just to be sure, insert a compiler barrier here.
                                946                 :                :          */
 2935                           947                 :        2604663 :         pg_compiler_barrier();
                                948         [ +  + ]:        2604663 :         if (mq->mq_detached)
                                949                 :                :         {
 4252                           950                 :              4 :             *bytes_written = sent;
 4443                           951                 :              4 :             return SHM_MQ_DETACHED;
                                952                 :                :         }
                                953                 :                : 
 3873                           954   [ +  +  +  + ]:        2604659 :         if (available == 0 && !mqh->mqh_counterparty_attached)
                                955                 :                :         {
                                956                 :                :             /*
                                957                 :                :              * The queue is full, so if the receiver isn't yet known to be
                                958                 :                :              * attached, we must wait for that to happen.
                                959                 :                :              */
                                960         [ +  + ]:              9 :             if (nowait)
                                961                 :                :             {
 3797                           962         [ -  + ]:              4 :                 if (shm_mq_counterparty_gone(mq, mqh->mqh_handle))
                                963                 :                :                 {
 3797 rhaas@postgresql.org      964                 :UBC           0 :                     *bytes_written = sent;
                                965                 :              0 :                     return SHM_MQ_DETACHED;
                                966                 :                :                 }
 3873 rhaas@postgresql.org      967         [ -  + ]:CBC           4 :                 if (shm_mq_get_receiver(mq) == NULL)
                                968                 :                :                 {
 4252 rhaas@postgresql.org      969                 :UBC           0 :                     *bytes_written = sent;
 3873                           970                 :              0 :                     return SHM_MQ_WOULD_BLOCK;
                                971                 :                :                 }
                                972                 :                :             }
 3873 rhaas@postgresql.org      973         [ -  + ]:CBC           5 :             else if (!shm_mq_wait_internal(mq, &mq->mq_receiver,
                                974                 :                :                                            mqh->mqh_handle))
                                975                 :                :             {
 3873 rhaas@postgresql.org      976                 :UBC           0 :                 mq->mq_detached = true;
                                977                 :              0 :                 *bytes_written = sent;
                                978                 :              0 :                 return SHM_MQ_DETACHED;
                                979                 :                :             }
 3873 rhaas@postgresql.org      980                 :CBC           9 :             mqh->mqh_counterparty_attached = true;
                                981                 :                : 
                                982                 :                :             /*
                                983                 :                :              * The receiver may have read some data after attaching, so we
                                984                 :                :              * must not wait without rechecking the queue state.
                                985                 :                :              */
                                986                 :                :         }
                                987         [ +  + ]:        2604650 :         else if (available == 0)
                                988                 :                :         {
                                989                 :                :             /* Update the pending send bytes in the shared memory. */
 1613                           990                 :          80681 :             shm_mq_inc_bytes_written(mq, mqh->mqh_send_pending);
                                991                 :                : 
                                992                 :                :             /*
                                993                 :                :              * Since mq->mqh_counterparty_attached is known to be true at this
                                994                 :                :              * point, mq_receiver has been set, and it can't change once set.
                                995                 :                :              * Therefore, we can read it without acquiring the spinlock.
                                996                 :                :              */
 2935                           997         [ -  + ]:          80681 :             Assert(mqh->mqh_counterparty_attached);
                                998                 :          80681 :             SetLatch(&mq->mq_receiver->procLatch);
                                999                 :                : 
                               1000                 :                :             /*
                               1001                 :                :              * We have just updated the mqh_send_pending bytes in the shared
                               1002                 :                :              * memory so reset it.
                               1003                 :                :              */
 1613                          1004                 :          80681 :             mqh->mqh_send_pending = 0;
                               1005                 :                : 
                               1006                 :                :             /* Skip manipulation of our latch if nowait = true. */
 4443                          1007         [ +  + ]:          80681 :             if (nowait)
                               1008                 :                :             {
                               1009                 :           3908 :                 *bytes_written = sent;
                               1010                 :           3908 :                 return SHM_MQ_WOULD_BLOCK;
                               1011                 :                :             }
                               1012                 :                : 
                               1013                 :                :             /*
                               1014                 :                :              * Wait for our latch to be set.  It might already be set for some
                               1015                 :                :              * unrelated reason, but that'll just result in one extra trip
                               1016                 :                :              * through the loop.  It's worth it to avoid resetting the latch
                               1017                 :                :              * at top of loop, because setting an already-set latch is much
                               1018                 :                :              * cheaper than setting one that has been reset.
                               1019                 :                :              */
 2669 tmunro@postgresql.or     1020                 :          76773 :             (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0,
                               1021                 :                :                              WAIT_EVENT_MESSAGE_QUEUE_SEND);
                               1022                 :                : 
                               1023                 :                :             /* Reset the latch so we don't spin. */
 4078 andres@anarazel.de       1024                 :          76773 :             ResetLatch(MyLatch);
                               1025                 :                : 
                               1026                 :                :             /* An interrupt may have occurred while we were waiting. */
 3513 tgl@sss.pgh.pa.us        1027         [ -  + ]:          76773 :             CHECK_FOR_INTERRUPTS();
                               1028                 :                :         }
                               1029                 :                :         else
                               1030                 :                :         {
                               1031                 :                :             Size        offset;
                               1032                 :                :             Size        sendnow;
                               1033                 :                : 
 2935 rhaas@postgresql.org     1034                 :        2523969 :             offset = wb % (uint64) ringsize;
                               1035                 :        2523969 :             sendnow = Min(available, ringsize - offset);
                               1036                 :                : 
                               1037                 :                :             /*
                               1038                 :                :              * Write as much data as we can via a single memcpy(). Make sure
                               1039                 :                :              * these writes happen after the read of mq_bytes_read, above.
                               1040                 :                :              * This barrier pairs with the one in shm_mq_inc_bytes_read.
                               1041                 :                :              * (Since we're separating the read of mq_bytes_read from a
                               1042                 :                :              * subsequent write to mq_ring, we need a full barrier here.)
                               1043                 :                :              */
                               1044                 :        2523969 :             pg_memory_barrier();
 4443                          1045                 :        2523969 :             memcpy(&mq->mq_ring[mq->mq_ring_offset + offset],
                               1046                 :                :                    (const char *) data + sent, sendnow);
                               1047                 :        2523969 :             sent += sendnow;
                               1048                 :                : 
                               1049                 :                :             /*
                               1050                 :                :              * Update count of bytes written, with alignment padding.  Note
                               1051                 :                :              * that this will never actually insert any padding except at the
                               1052                 :                :              * end of a run of bytes, because the buffer size is a multiple of
                               1053                 :                :              * MAXIMUM_ALIGNOF, and each read is as well.
                               1054                 :                :              */
 4380                          1055   [ +  +  -  + ]:        2523969 :             Assert(sent == nbytes || sendnow == MAXALIGN(sendnow));
                               1056                 :                : 
                               1057                 :                :             /*
                               1058                 :                :              * For efficiency, we don't update the bytes written in the shared
                               1059                 :                :              * memory and also don't set the reader's latch here.  Refer to
                               1060                 :                :              * the comments atop the shm_mq_handle structure for more
                               1061                 :                :              * information.
                               1062                 :                :              */
 1613                          1063                 :        2523969 :             mqh->mqh_send_pending += MAXALIGN(sendnow);
                               1064                 :                :         }
                               1065                 :                :     }
                               1066                 :                : 
 4443                          1067                 :        2435701 :     *bytes_written = sent;
                               1068                 :        2435701 :     return SHM_MQ_SUCCESS;
                               1069                 :                : }
                               1070                 :                : 
                               1071                 :                : /*
                               1072                 :                :  * Wait until at least *nbytesp bytes are available to be read from the
                               1073                 :                :  * shared message queue, or until the buffer wraps around.  If the queue is
                               1074                 :                :  * detached, returns SHM_MQ_DETACHED.  If nowait is specified and a wait
                               1075                 :                :  * would be required, returns SHM_MQ_WOULD_BLOCK.  Otherwise, *datap is set
                               1076                 :                :  * to the location at which data bytes can be read, *nbytesp is set to the
                               1077                 :                :  * number of bytes which can be read at that address, and the return value
                               1078                 :                :  * is SHM_MQ_SUCCESS.
                               1079                 :                :  */
                               1080                 :                : static shm_mq_result
 2935                          1081                 :        1636077 : shm_mq_receive_bytes(shm_mq_handle *mqh, Size bytes_needed, bool nowait,
                               1082                 :                :                      Size *nbytesp, void **datap)
                               1083                 :                : {
                               1084                 :        1636077 :     shm_mq     *mq = mqh->mqh_queue;
 4380                          1085                 :        1636077 :     Size        ringsize = mq->mq_ring_size;
                               1086                 :                :     uint64      used;
                               1087                 :                :     uint64      written;
                               1088                 :                : 
                               1089                 :                :     for (;;)
 4443                          1090                 :         112397 :     {
                               1091                 :                :         Size        offset;
                               1092                 :                :         uint64      read;
                               1093                 :                : 
                               1094                 :                :         /* Get bytes written, so we can compute what's available to read. */
 2935                          1095                 :        1748474 :         written = pg_atomic_read_u64(&mq->mq_bytes_written);
                               1096                 :                : 
                               1097                 :                :         /*
                               1098                 :                :          * Get bytes read.  Include bytes we could consume but have not yet
                               1099                 :                :          * consumed.
                               1100                 :                :          */
                               1101                 :        1748474 :         read = pg_atomic_read_u64(&mq->mq_bytes_read) +
                               1102                 :        1748474 :             mqh->mqh_consume_pending;
                               1103                 :        1748474 :         used = written - read;
 4443                          1104         [ -  + ]:        1748474 :         Assert(used <= ringsize);
 2935                          1105                 :        1748474 :         offset = read % (uint64) ringsize;
                               1106                 :                : 
                               1107                 :                :         /* If we have enough data or buffer has wrapped, we're done. */
 4443                          1108   [ +  +  +  + ]:        1748474 :         if (used >= bytes_needed || offset + used >= ringsize)
                               1109                 :                :         {
                               1110                 :        1309167 :             *nbytesp = Min(used, ringsize - offset);
                               1111                 :        1309167 :             *datap = &mq->mq_ring[mq->mq_ring_offset + offset];
                               1112                 :                : 
                               1113                 :                :             /*
                               1114                 :                :              * Separate the read of mq_bytes_written, above, from caller's
                               1115                 :                :              * attempt to read the data itself.  Pairs with the barrier in
                               1116                 :                :              * shm_mq_inc_bytes_written.
                               1117                 :                :              */
 2935                          1118                 :        1309167 :             pg_read_barrier();
 4443                          1119                 :        1309167 :             return SHM_MQ_SUCCESS;
                               1120                 :                :         }
                               1121                 :                : 
                               1122                 :                :         /*
                               1123                 :                :          * Fall out before waiting if the queue has been detached.
                               1124                 :                :          *
                               1125                 :                :          * Note that we don't check for this until *after* considering whether
                               1126                 :                :          * the data already available is enough, since the receiver can finish
                               1127                 :                :          * receiving a message stored in the buffer even after the sender has
                               1128                 :                :          * detached.
                               1129                 :                :          */
 2935                          1130         [ +  + ]:         439307 :         if (mq->mq_detached)
                               1131                 :                :         {
                               1132                 :                :             /*
                               1133                 :                :              * If the writer advanced mq_bytes_written and then set
                               1134                 :                :              * mq_detached, we might not have read the final value of
                               1135                 :                :              * mq_bytes_written above.  Insert a read barrier and then check
                               1136                 :                :              * again if mq_bytes_written has advanced.
                               1137                 :                :              */
 2932                          1138                 :           1344 :             pg_read_barrier();
                               1139         [ -  + ]:           1344 :             if (written != pg_atomic_read_u64(&mq->mq_bytes_written))
 2932 rhaas@postgresql.org     1140                 :UBC           0 :                 continue;
                               1141                 :                : 
 4443 rhaas@postgresql.org     1142                 :CBC        1344 :             return SHM_MQ_DETACHED;
                               1143                 :                :         }
                               1144                 :                : 
                               1145                 :                :         /*
                               1146                 :                :          * We didn't get enough data to satisfy the request, so mark any data
                               1147                 :                :          * previously-consumed as read to make more buffer space.
                               1148                 :                :          */
 2935                          1149         [ +  + ]:         437963 :         if (mqh->mqh_consume_pending > 0)
                               1150                 :                :         {
                               1151                 :          96795 :             shm_mq_inc_bytes_read(mq, mqh->mqh_consume_pending);
                               1152                 :          96795 :             mqh->mqh_consume_pending = 0;
                               1153                 :                :         }
                               1154                 :                : 
                               1155                 :                :         /* Skip manipulation of our latch if nowait = true. */
 4443                          1156         [ +  + ]:         437963 :         if (nowait)
                               1157                 :         325566 :             return SHM_MQ_WOULD_BLOCK;
                               1158                 :                : 
                               1159                 :                :         /*
                               1160                 :                :          * Wait for our latch to be set.  It might already be set for some
                               1161                 :                :          * unrelated reason, but that'll just result in one extra trip through
                               1162                 :                :          * the loop.  It's worth it to avoid resetting the latch at top of
                               1163                 :                :          * loop, because setting an already-set latch is much cheaper than
                               1164                 :                :          * setting one that has been reset.
                               1165                 :                :          */
 2669 tmunro@postgresql.or     1166                 :         112397 :         (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0,
                               1167                 :                :                          WAIT_EVENT_MESSAGE_QUEUE_RECEIVE);
                               1168                 :                : 
                               1169                 :                :         /* Reset the latch so we don't spin. */
 4078 andres@anarazel.de       1170                 :         112397 :         ResetLatch(MyLatch);
                               1171                 :                : 
                               1172                 :                :         /* An interrupt may have occurred while we were waiting. */
 3513 tgl@sss.pgh.pa.us        1173         [ +  + ]:         112397 :         CHECK_FOR_INTERRUPTS();
                               1174                 :                :     }
                               1175                 :                : }
                               1176                 :                : 
                               1177                 :                : /*
                               1178                 :                :  * Test whether a counterparty who may not even be alive yet is definitely gone.
                               1179                 :                :  */
                               1180                 :                : static bool
 2936 andres@anarazel.de       1181                 :        1153693 : shm_mq_counterparty_gone(shm_mq *mq, BackgroundWorkerHandle *handle)
                               1182                 :                : {
                               1183                 :                :     pid_t       pid;
                               1184                 :                : 
                               1185                 :                :     /* If the queue has been detached, counterparty is definitely gone. */
 2935 rhaas@postgresql.org     1186         [ +  + ]:        1153693 :     if (mq->mq_detached)
 3797                          1187                 :           1087 :         return true;
                               1188                 :                : 
                               1189                 :                :     /* If there's a handle, check worker status. */
                               1190         [ +  + ]:        1152606 :     if (handle != NULL)
                               1191                 :                :     {
                               1192                 :                :         BgwHandleStatus status;
                               1193                 :                : 
                               1194                 :                :         /* Check for unexpected worker death. */
                               1195                 :        1152587 :         status = GetBackgroundWorkerPid(handle, &pid);
                               1196   [ +  +  -  + ]:        1152587 :         if (status != BGWH_STARTED && status != BGWH_NOT_YET_STARTED)
                               1197                 :                :         {
                               1198                 :                :             /* Mark it detached, just to make it official. */
 3797 rhaas@postgresql.org     1199                 :UBC           0 :             mq->mq_detached = true;
                               1200                 :              0 :             return true;
                               1201                 :                :         }
                               1202                 :                :     }
                               1203                 :                : 
                               1204                 :                :     /* Counterparty is not definitively gone. */
 3797 rhaas@postgresql.org     1205                 :CBC     1152606 :     return false;
                               1206                 :                : }
                               1207                 :                : 
                               1208                 :                : /*
                               1209                 :                :  * This is used when a process is waiting for its counterpart to attach to the
                               1210                 :                :  * queue.  We exit when the other process attaches as expected, or, if
                               1211                 :                :  * handle != NULL, when the referenced background process or the postmaster
                               1212                 :                :  * dies.  Note that if handle == NULL, and the process fails to attach, we'll
                               1213                 :                :  * potentially get stuck here forever waiting for a process that may never
                               1214                 :                :  * start.  We do check for interrupts, though.
                               1215                 :                :  *
                               1216                 :                :  * ptr is a pointer to the memory address that we're expecting to become
                               1217                 :                :  * non-NULL when our counterpart attaches to the queue.
                               1218                 :                :  */
                               1219                 :                : static bool
 2936 andres@anarazel.de       1220                 :             69 : shm_mq_wait_internal(shm_mq *mq, PGPROC **ptr, BackgroundWorkerHandle *handle)
                               1221                 :                : {
 4331 bruce@momjian.us         1222                 :             69 :     bool        result = false;
                               1223                 :                : 
                               1224                 :                :     for (;;)
 4443 rhaas@postgresql.org     1225                 :            100 :     {
                               1226                 :                :         BgwHandleStatus status;
                               1227                 :                :         pid_t       pid;
                               1228                 :                : 
                               1229                 :                :         /* Acquire the lock just long enough to check the pointer. */
 3810                          1230         [ -  + ]:            169 :         SpinLockAcquire(&mq->mq_mutex);
                               1231                 :            169 :         result = (*ptr != NULL);
                               1232                 :            169 :         SpinLockRelease(&mq->mq_mutex);
                               1233                 :                : 
                               1234                 :                :         /* Fail if detached; else succeed if initialized. */
 2935                          1235         [ +  + ]:            169 :         if (mq->mq_detached)
                               1236                 :                :         {
 3810                          1237                 :             24 :             result = false;
                               1238                 :             24 :             break;
                               1239                 :                :         }
                               1240         [ +  + ]:            145 :         if (result)
                               1241                 :             45 :             break;
                               1242                 :                : 
                               1243         [ +  - ]:            100 :         if (handle != NULL)
                               1244                 :                :         {
                               1245                 :                :             /* Check for unexpected worker death. */
                               1246                 :            100 :             status = GetBackgroundWorkerPid(handle, &pid);
                               1247   [ +  +  -  + ]:            100 :             if (status != BGWH_STARTED && status != BGWH_NOT_YET_STARTED)
                               1248                 :                :             {
 4443 rhaas@postgresql.org     1249                 :UBC           0 :                 result = false;
                               1250                 :              0 :                 break;
                               1251                 :                :             }
                               1252                 :                :         }
                               1253                 :                : 
                               1254                 :                :         /* Wait to be signaled. */
 2669 tmunro@postgresql.or     1255                 :CBC         100 :         (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0,
                               1256                 :                :                          WAIT_EVENT_MESSAGE_QUEUE_INTERNAL);
                               1257                 :                : 
                               1258                 :                :         /* Reset the latch so we don't spin. */
 3810 rhaas@postgresql.org     1259                 :            100 :         ResetLatch(MyLatch);
                               1260                 :                : 
                               1261                 :                :         /* An interrupt may have occurred while we were waiting. */
 3513 tgl@sss.pgh.pa.us        1262         [ +  + ]:            100 :         CHECK_FOR_INTERRUPTS();
                               1263                 :                :     }
                               1264                 :                : 
 4443 rhaas@postgresql.org     1265                 :             69 :     return result;
                               1266                 :                : }
                               1267                 :                : 
                               1268                 :                : /*
                               1269                 :                :  * Increment the number of bytes read.
                               1270                 :                :  */
                               1271                 :                : static void
 2936 andres@anarazel.de       1272                 :         115925 : shm_mq_inc_bytes_read(shm_mq *mq, Size n)
                               1273                 :                : {
                               1274                 :                :     PGPROC     *sender;
                               1275                 :                : 
                               1276                 :                :     /*
                               1277                 :                :      * Separate prior reads of mq_ring from the increment of mq_bytes_read
                               1278                 :                :      * which follows.  This pairs with the full barrier in
                               1279                 :                :      * shm_mq_send_bytes(). We only need a read barrier here because the
                               1280                 :                :      * increment of mq_bytes_read is actually a read followed by a dependent
                               1281                 :                :      * write.
                               1282                 :                :      */
 2935 rhaas@postgresql.org     1283                 :         115925 :     pg_read_barrier();
                               1284                 :                : 
                               1285                 :                :     /*
                               1286                 :                :      * There's no need to use pg_atomic_fetch_add_u64 here, because nobody
                               1287                 :                :      * else can be changing this value.  This method should be cheaper.
                               1288                 :                :      */
                               1289                 :         115925 :     pg_atomic_write_u64(&mq->mq_bytes_read,
                               1290                 :         115925 :                         pg_atomic_read_u64(&mq->mq_bytes_read) + n);
                               1291                 :                : 
                               1292                 :                :     /*
                               1293                 :                :      * We shouldn't have any bytes to read without a sender, so we can read
                               1294                 :                :      * mq_sender here without a lock.  Once it's initialized, it can't change.
                               1295                 :                :      */
 4443                          1296                 :         115925 :     sender = mq->mq_sender;
                               1297         [ -  + ]:         115925 :     Assert(sender != NULL);
                               1298                 :         115925 :     SetLatch(&sender->procLatch);
                               1299                 :         115925 : }
                               1300                 :                : 
                               1301                 :                : /*
                               1302                 :                :  * Increment the number of bytes written.
                               1303                 :                :  */
                               1304                 :                : static void
 2936 andres@anarazel.de       1305                 :         202889 : shm_mq_inc_bytes_written(shm_mq *mq, Size n)
                               1306                 :                : {
                               1307                 :                :     /*
                               1308                 :                :      * Separate prior reads of mq_ring from the write of mq_bytes_written
                               1309                 :                :      * which we're about to do.  Pairs with the read barrier found in
                               1310                 :                :      * shm_mq_receive_bytes.
                               1311                 :                :      */
 2935 rhaas@postgresql.org     1312                 :         202889 :     pg_write_barrier();
                               1313                 :                : 
                               1314                 :                :     /*
                               1315                 :                :      * There's no need to use pg_atomic_fetch_add_u64 here, because nobody
                               1316                 :                :      * else can be changing this value.  This method avoids taking the bus
                               1317                 :                :      * lock unnecessarily.
                               1318                 :                :      */
                               1319                 :         202889 :     pg_atomic_write_u64(&mq->mq_bytes_written,
                               1320                 :         202889 :                         pg_atomic_read_u64(&mq->mq_bytes_written) + n);
 4443                          1321                 :         202889 : }
                               1322                 :                : 
                               1323                 :                : /* Shim for on_dsm_detach callback. */
                               1324                 :                : static void
                               1325                 :           1599 : shm_mq_detach_callback(dsm_segment *seg, Datum arg)
                               1326                 :                : {
                               1327                 :           1599 :     shm_mq     *mq = (shm_mq *) DatumGetPointer(arg);
                               1328                 :                : 
 3118 tgl@sss.pgh.pa.us        1329                 :           1599 :     shm_mq_detach_internal(mq);
 4443 rhaas@postgresql.org     1330                 :           1599 : }
        

Generated by: LCOV version 2.4-beta