Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * shm_mq.c
4 : : * single-reader, single-writer shared memory message queue
5 : : *
6 : : * Both the sender and the receiver must have a PGPROC; their respective
7 : : * process latches are used for synchronization. Only the sender may send,
8 : : * and only the receiver may receive. This is intended to allow a user
9 : : * backend to communicate with worker backends that it has registered.
10 : : *
11 : : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
12 : : * Portions Copyright (c) 1994, Regents of the University of California
13 : : *
14 : : * src/backend/storage/ipc/shm_mq.c
15 : : *
16 : : *-------------------------------------------------------------------------
17 : : */
18 : :
19 : : #include "postgres.h"
20 : :
21 : : #include "miscadmin.h"
22 : : #include "pgstat.h"
23 : : #include "port/pg_bitutils.h"
24 : : #include "postmaster/bgworker.h"
25 : : #include "storage/proc.h"
26 : : #include "storage/shm_mq.h"
27 : : #include "storage/spin.h"
28 : : #include "utils/memutils.h"
29 : : #include "utils/wait_event.h"
30 : :
31 : : /*
32 : : * This structure represents the actual queue, stored in shared memory.
33 : : *
34 : : * Some notes on synchronization:
35 : : *
36 : : * mq_receiver and mq_bytes_read can only be changed by the receiver; and
37 : : * mq_sender and mq_bytes_written can only be changed by the sender.
38 : : * mq_receiver and mq_sender are protected by mq_mutex, although, importantly,
39 : : * they cannot change once set, and thus may be read without a lock once this
40 : : * is known to be the case.
41 : : *
42 : : * mq_bytes_read and mq_bytes_written are not protected by the mutex. Instead,
43 : : * they are written atomically using 8 byte loads and stores. Memory barriers
44 : : * must be carefully used to synchronize reads and writes of these values with
45 : : * reads and writes of the actual data in mq_ring.
46 : : *
47 : : * mq_detached needs no locking. It can be set by either the sender or the
48 : : * receiver, but only ever from false to true, so redundant writes don't
49 : : * matter. It is important that if we set mq_detached and then set the
50 : : * counterparty's latch, the counterparty must be certain to see the change
51 : : * after waking up. Since SetLatch begins with a memory barrier and ResetLatch
52 : : * ends with one, this should be OK.
53 : : *
54 : : * mq_ring_size and mq_ring_offset never change after initialization, and
55 : : * can therefore be read without the lock.
56 : : *
57 : : * Importantly, mq_ring can be safely read and written without a lock.
58 : : * At any given time, the difference between mq_bytes_read and
59 : : * mq_bytes_written defines the number of bytes within mq_ring that contain
60 : : * unread data, and mq_bytes_read defines the position where those bytes
61 : : * begin. The sender can increase the number of unread bytes at any time,
62 : : * but only the receiver can give license to overwrite those bytes, by
63 : : * incrementing mq_bytes_read. Therefore, it's safe for the receiver to read
64 : : * the unread bytes it knows to be present without the lock. Conversely,
65 : : * the sender can write to the unused portion of the ring buffer without
66 : : * the lock, because nobody else can be reading or writing those bytes. The
67 : : * receiver could be making more bytes unused by incrementing mq_bytes_read,
68 : : * but that's OK. Note that it would be unsafe for the receiver to read any
69 : : * data it's already marked as read, or to write any data; and it would be
70 : : * unsafe for the sender to reread any data after incrementing
71 : : * mq_bytes_written, but fortunately there's no need for any of that.
72 : : */
73 : : struct shm_mq
74 : : {
75 : : slock_t mq_mutex;
76 : : PGPROC *mq_receiver;
77 : : PGPROC *mq_sender;
78 : : pg_atomic_uint64 mq_bytes_read;
79 : : pg_atomic_uint64 mq_bytes_written;
80 : : Size mq_ring_size;
81 : : bool mq_detached;
82 : : uint8 mq_ring_offset;
83 : : char mq_ring[FLEXIBLE_ARRAY_MEMBER];
84 : : };
85 : :
86 : : /*
87 : : * This structure is a backend-private handle for access to a queue.
88 : : *
89 : : * mqh_queue is a pointer to the queue we've attached, and mqh_segment is
90 : : * an optional pointer to the dynamic shared memory segment that contains it.
91 : : * (If mqh_segment is provided, we register an on_dsm_detach callback to
92 : : * make sure we detach from the queue before detaching from DSM.)
93 : : *
94 : : * If this queue is intended to connect the current process with a background
95 : : * worker that started it, the user can pass a pointer to the worker handle
96 : : * to shm_mq_attach(), and we'll store it in mqh_handle. The point of this
97 : : * is to allow us to begin sending to or receiving from that queue before the
98 : : * process we'll be communicating with has even been started. If it fails
99 : : * to start, the handle will allow us to notice that and fail cleanly, rather
100 : : * than waiting forever; see shm_mq_wait_internal. This is mostly useful in
101 : : * simple cases - e.g. where there are just 2 processes communicating; in
102 : : * more complex scenarios, every process may not have a BackgroundWorkerHandle
103 : : * available, or may need to watch for the failure of more than one other
104 : : * process at a time.
105 : : *
106 : : * When a message exists as a contiguous chunk of bytes in the queue - that is,
107 : : * it is smaller than the size of the ring buffer and does not wrap around
108 : : * the end - we return the message to the caller as a pointer into the buffer.
109 : : * For messages that are larger or happen to wrap, we reassemble the message
110 : : * locally by copying the chunks into a backend-local buffer. mqh_buffer is
111 : : * the buffer, and mqh_buflen is the number of bytes allocated for it.
112 : : *
113 : : * mqh_send_pending, is number of bytes that is written to the queue but not
114 : : * yet updated in the shared memory. We will not update it until the written
115 : : * data is 1/4th of the ring size or the tuple queue is full. This will
116 : : * prevent frequent CPU cache misses, and it will also avoid frequent
117 : : * SetLatch() calls, which are quite expensive.
118 : : *
119 : : * mqh_partial_bytes, mqh_expected_bytes, and mqh_length_word_complete
120 : : * are used to track the state of non-blocking operations. When the caller
121 : : * attempts a non-blocking operation that returns SHM_MQ_WOULD_BLOCK, they
122 : : * are expected to retry the call at a later time with the same argument;
123 : : * we need to retain enough state to pick up where we left off.
124 : : * mqh_length_word_complete tracks whether we are done sending or receiving
125 : : * (whichever we're doing) the entire length word. mqh_partial_bytes tracks
126 : : * the number of bytes read or written for either the length word or the
127 : : * message itself, and mqh_expected_bytes - which is used only for reads -
128 : : * tracks the expected total size of the payload.
129 : : *
130 : : * mqh_counterparty_attached tracks whether we know the counterparty to have
131 : : * attached to the queue at some previous point. This lets us avoid some
132 : : * mutex acquisitions.
133 : : *
134 : : * mqh_context is the memory context in effect at the time we attached to
135 : : * the shm_mq. The shm_mq_handle itself is allocated in this context, and
136 : : * we make sure any other allocations we do happen in this context as well,
137 : : * to avoid nasty surprises.
138 : : */
139 : : struct shm_mq_handle
140 : : {
141 : : shm_mq *mqh_queue;
142 : : dsm_segment *mqh_segment;
143 : : BackgroundWorkerHandle *mqh_handle;
144 : : char *mqh_buffer;
145 : : Size mqh_buflen;
146 : : Size mqh_consume_pending;
147 : : Size mqh_send_pending;
148 : : Size mqh_partial_bytes;
149 : : Size mqh_expected_bytes;
150 : : bool mqh_length_word_complete;
151 : : bool mqh_counterparty_attached;
152 : : MemoryContext mqh_context;
153 : : };
154 : :
155 : : static void shm_mq_detach_internal(shm_mq *mq);
156 : : static shm_mq_result shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes,
157 : : const void *data, bool nowait, Size *bytes_written);
158 : : static shm_mq_result shm_mq_receive_bytes(shm_mq_handle *mqh,
159 : : Size bytes_needed, bool nowait, Size *nbytesp,
160 : : void **datap);
161 : : static bool shm_mq_counterparty_gone(shm_mq *mq,
162 : : BackgroundWorkerHandle *handle);
163 : : static bool shm_mq_wait_internal(shm_mq *mq, PGPROC **ptr,
164 : : BackgroundWorkerHandle *handle);
165 : : static void shm_mq_inc_bytes_read(shm_mq *mq, Size n);
166 : : static void shm_mq_inc_bytes_written(shm_mq *mq, Size n);
167 : : static void shm_mq_detach_callback(dsm_segment *seg, Datum arg);
168 : :
169 : : /* Minimum queue size is enough for header and at least one chunk of data. */
170 : : const Size shm_mq_minimum_size =
171 : : MAXALIGN(offsetof(shm_mq, mq_ring)) + MAXIMUM_ALIGNOF;
172 : :
173 : : #define MQH_INITIAL_BUFSIZE 8192
174 : :
175 : : /*
176 : : * Initialize a new shared message queue.
177 : : */
178 : : shm_mq *
4443 rhaas@postgresql.org 179 :CBC 2956 : shm_mq_create(void *address, Size size)
180 : : {
181 : 2956 : shm_mq *mq = address;
4380 182 : 2956 : Size data_offset = MAXALIGN(offsetof(shm_mq, mq_ring));
183 : :
184 : : /* If the size isn't MAXALIGN'd, just discard the odd bytes. */
4443 185 : 2956 : size = MAXALIGN_DOWN(size);
186 : :
187 : : /* Queue size must be large enough to hold some data. */
188 [ - + ]: 2956 : Assert(size > data_offset);
189 : :
190 : : /* Initialize queue header. */
191 : 2956 : SpinLockInit(&mq->mq_mutex);
192 : 2956 : mq->mq_receiver = NULL;
193 : 2956 : mq->mq_sender = NULL;
2935 194 : 2956 : pg_atomic_init_u64(&mq->mq_bytes_read, 0);
195 : 2956 : pg_atomic_init_u64(&mq->mq_bytes_written, 0);
4443 196 : 2956 : mq->mq_ring_size = size - data_offset;
197 : 2956 : mq->mq_detached = false;
198 : 2956 : mq->mq_ring_offset = data_offset - offsetof(shm_mq, mq_ring);
199 : :
200 : 2956 : return mq;
201 : : }
202 : :
203 : : /*
204 : : * Set the identity of the process that will receive from a shared message
205 : : * queue.
206 : : */
207 : : void
208 : 2956 : shm_mq_set_receiver(shm_mq *mq, PGPROC *proc)
209 : : {
210 : : PGPROC *sender;
211 : :
212 [ - + ]: 2956 : SpinLockAcquire(&mq->mq_mutex);
2936 andres@anarazel.de 213 [ - + ]: 2956 : Assert(mq->mq_receiver == NULL);
214 : 2956 : mq->mq_receiver = proc;
215 : 2956 : sender = mq->mq_sender;
4443 rhaas@postgresql.org 216 : 2956 : SpinLockRelease(&mq->mq_mutex);
217 : :
218 [ + + ]: 2956 : if (sender != NULL)
219 : 19 : SetLatch(&sender->procLatch);
220 : 2956 : }
221 : :
222 : : /*
223 : : * Set the identity of the process that will send to a shared message queue.
224 : : */
225 : : void
226 : 2872 : shm_mq_set_sender(shm_mq *mq, PGPROC *proc)
227 : : {
228 : : PGPROC *receiver;
229 : :
230 [ + + ]: 2872 : SpinLockAcquire(&mq->mq_mutex);
2936 andres@anarazel.de 231 [ - + ]: 2872 : Assert(mq->mq_sender == NULL);
232 : 2872 : mq->mq_sender = proc;
233 : 2872 : receiver = mq->mq_receiver;
4443 rhaas@postgresql.org 234 : 2872 : SpinLockRelease(&mq->mq_mutex);
235 : :
236 [ + + ]: 2872 : if (receiver != NULL)
237 : 2853 : SetLatch(&receiver->procLatch);
238 : 2872 : }
239 : :
240 : : /*
241 : : * Get the configured receiver.
242 : : */
243 : : PGPROC *
244 : 4 : shm_mq_get_receiver(shm_mq *mq)
245 : : {
246 : : PGPROC *receiver;
247 : :
248 [ - + ]: 4 : SpinLockAcquire(&mq->mq_mutex);
2936 andres@anarazel.de 249 : 4 : receiver = mq->mq_receiver;
4443 rhaas@postgresql.org 250 : 4 : SpinLockRelease(&mq->mq_mutex);
251 : :
252 : 4 : return receiver;
253 : : }
254 : :
255 : : /*
256 : : * Get the configured sender.
257 : : */
258 : : PGPROC *
259 : 3326460 : shm_mq_get_sender(shm_mq *mq)
260 : : {
261 : : PGPROC *sender;
262 : :
263 [ + + ]: 3326460 : SpinLockAcquire(&mq->mq_mutex);
2936 andres@anarazel.de 264 : 3326460 : sender = mq->mq_sender;
4443 rhaas@postgresql.org 265 : 3326460 : SpinLockRelease(&mq->mq_mutex);
266 : :
267 : 3326460 : return sender;
268 : : }
269 : :
270 : : /*
271 : : * Attach to a shared message queue so we can send or receive messages.
272 : : *
273 : : * The memory context in effect at the time this function is called should
274 : : * be one which will last for at least as long as the message queue itself.
275 : : * We'll allocate the handle in that context, and future allocations that
276 : : * are needed to buffer incoming data will happen in that context as well.
277 : : *
278 : : * If seg != NULL, the queue will be automatically detached when that dynamic
279 : : * shared memory segment is detached.
280 : : *
281 : : * If handle != NULL, the queue can be read or written even before the
282 : : * other process has attached. We'll wait for it to do so if needed. The
283 : : * handle must be for a background worker initialized with bgw_notify_pid
284 : : * equal to our PID.
285 : : *
286 : : * shm_mq_detach() should be called when done. This will free the
287 : : * shm_mq_handle and mark the queue itself as detached, so that our
288 : : * counterpart won't get stuck waiting for us to fill or drain the queue
289 : : * after we've already lost interest.
290 : : */
291 : : shm_mq_handle *
292 : 5828 : shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
293 : : {
95 michael@paquier.xyz 294 :GNC 5828 : shm_mq_handle *mqh = palloc_object(shm_mq_handle);
295 : :
4443 rhaas@postgresql.org 296 [ + + - + ]:CBC 5828 : Assert(mq->mq_receiver == MyProc || mq->mq_sender == MyProc);
297 : 5828 : mqh->mqh_queue = mq;
298 : 5828 : mqh->mqh_segment = seg;
299 : 5828 : mqh->mqh_handle = handle;
3118 tgl@sss.pgh.pa.us 300 : 5828 : mqh->mqh_buffer = NULL;
4443 rhaas@postgresql.org 301 : 5828 : mqh->mqh_buflen = 0;
302 : 5828 : mqh->mqh_consume_pending = 0;
1613 303 : 5828 : mqh->mqh_send_pending = 0;
4380 304 : 5828 : mqh->mqh_partial_bytes = 0;
3118 tgl@sss.pgh.pa.us 305 : 5828 : mqh->mqh_expected_bytes = 0;
4380 rhaas@postgresql.org 306 : 5828 : mqh->mqh_length_word_complete = false;
4443 307 : 5828 : mqh->mqh_counterparty_attached = false;
3118 tgl@sss.pgh.pa.us 308 : 5828 : mqh->mqh_context = CurrentMemoryContext;
309 : :
4443 rhaas@postgresql.org 310 [ + - ]: 5828 : if (seg != NULL)
311 : 5828 : on_dsm_detach(seg, shm_mq_detach_callback, PointerGetDatum(mq));
312 : :
313 : 5828 : return mqh;
314 : : }
315 : :
316 : : /*
317 : : * Associate a BackgroundWorkerHandle with a shm_mq_handle just as if it had
318 : : * been passed to shm_mq_attach.
319 : : */
320 : : void
4176 321 : 2836 : shm_mq_set_handle(shm_mq_handle *mqh, BackgroundWorkerHandle *handle)
322 : : {
323 [ - + ]: 2836 : Assert(mqh->mqh_handle == NULL);
324 : 2836 : mqh->mqh_handle = handle;
325 : 2836 : }
326 : :
327 : : /*
328 : : * Write a message into a shared message queue.
329 : : */
330 : : shm_mq_result
1613 331 : 1222256 : shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait,
332 : : bool force_flush)
333 : : {
334 : : shm_mq_iovec iov;
335 : :
4176 336 : 1222256 : iov.data = data;
337 : 1222256 : iov.len = nbytes;
338 : :
1613 339 : 1222256 : return shm_mq_sendv(mqh, &iov, 1, nowait, force_flush);
340 : : }
341 : :
342 : : /*
343 : : * Write a message into a shared message queue, gathered from multiple
344 : : * addresses.
345 : : *
346 : : * When nowait = false, we'll wait on our process latch when the ring buffer
347 : : * fills up, and then continue writing once the receiver has drained some data.
348 : : * The process latch is reset after each wait.
349 : : *
350 : : * When nowait = true, we do not manipulate the state of the process latch;
351 : : * instead, if the buffer becomes full, we return SHM_MQ_WOULD_BLOCK. In
352 : : * this case, the caller should call this function again, with the same
353 : : * arguments, each time the process latch is set. (Once begun, the sending
354 : : * of a message cannot be aborted except by detaching from the queue; changing
355 : : * the length or payload will corrupt the queue.)
356 : : *
357 : : * When force_flush = true, we immediately update the shm_mq's mq_bytes_written
358 : : * and notify the receiver (if it is already attached). Otherwise, we don't
359 : : * update it until we have written an amount of data greater than 1/4th of the
360 : : * ring size.
361 : : */
362 : : shm_mq_result
363 : 1223758 : shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait,
364 : : bool force_flush)
365 : : {
366 : : shm_mq_result res;
4331 bruce@momjian.us 367 : 1223758 : shm_mq *mq = mqh->mqh_queue;
368 : : PGPROC *receiver;
4176 rhaas@postgresql.org 369 : 1223758 : Size nbytes = 0;
370 : : Size bytes_written;
371 : : int i;
372 : 1223758 : int which_iov = 0;
373 : : Size offset;
374 : :
4443 375 [ - + ]: 1223758 : Assert(mq->mq_sender == MyProc);
376 : :
377 : : /* Compute total size of write. */
4176 378 [ + + ]: 2449018 : for (i = 0; i < iovcnt; ++i)
379 : 1225260 : nbytes += iov[i].len;
380 : :
381 : : /* Prevent writing messages overwhelming the receiver. */
1973 peter@eisentraut.org 382 [ - + ]: 1223758 : if (nbytes > MaxAllocSize)
1973 peter@eisentraut.org 383 [ # # ]:UBC 0 : ereport(ERROR,
384 : : (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
385 : : errmsg("cannot send a message of size %zu via shared memory queue",
386 : : nbytes)));
387 : :
388 : : /* Try to write, or finish writing, the length word into the buffer. */
4380 rhaas@postgresql.org 389 [ + + ]:CBC 2443604 : while (!mqh->mqh_length_word_complete)
390 : : {
391 [ - + ]: 1219850 : Assert(mqh->mqh_partial_bytes < sizeof(Size));
392 : 1219850 : res = shm_mq_send_bytes(mqh, sizeof(Size) - mqh->mqh_partial_bytes,
3189 tgl@sss.pgh.pa.us 393 : 1219850 : ((char *) &nbytes) + mqh->mqh_partial_bytes,
394 : : nowait, &bytes_written);
395 : :
3569 rhaas@postgresql.org 396 [ + + ]: 1219850 : if (res == SHM_MQ_DETACHED)
397 : : {
398 : : /* Reset state in case caller tries to send another message. */
399 : 4 : mqh->mqh_partial_bytes = 0;
400 : 4 : mqh->mqh_length_word_complete = false;
4443 401 : 4 : return res;
402 : : }
3569 403 : 1219846 : mqh->mqh_partial_bytes += bytes_written;
404 : :
4380 405 [ + - ]: 1219846 : if (mqh->mqh_partial_bytes >= sizeof(Size))
406 : : {
407 [ - + ]: 1219846 : Assert(mqh->mqh_partial_bytes == sizeof(Size));
408 : :
409 : 1219846 : mqh->mqh_partial_bytes = 0;
410 : 1219846 : mqh->mqh_length_word_complete = true;
411 : : }
412 : :
3569 413 [ - + ]: 1219846 : if (res != SHM_MQ_SUCCESS)
3569 rhaas@postgresql.org 414 :UBC 0 : return res;
415 : :
416 : : /* Length word can't be split unless bigger than required alignment. */
4380 rhaas@postgresql.org 417 [ - + ]:CBC 1219846 : Assert(mqh->mqh_length_word_complete || sizeof(Size) > MAXIMUM_ALIGNOF);
418 : : }
419 : :
420 : : /* Write the actual data bytes into the buffer. */
421 [ - + ]: 1223754 : Assert(mqh->mqh_partial_bytes <= nbytes);
4176 422 : 1223754 : offset = mqh->mqh_partial_bytes;
423 : : do
424 : : {
425 : : Size chunksize;
426 : :
427 : : /* Figure out which bytes need to be sent next. */
428 [ + + ]: 1223767 : if (offset >= iov[which_iov].len)
429 : : {
430 : 4004 : offset -= iov[which_iov].len;
431 : 4004 : ++which_iov;
432 [ + + ]: 4004 : if (which_iov >= iovcnt)
433 : 4000 : break;
434 : 4 : continue;
435 : : }
436 : :
437 : : /*
438 : : * We want to avoid copying the data if at all possible, but every
439 : : * chunk of bytes we write into the queue has to be MAXALIGN'd, except
440 : : * the last. Thus, if a chunk other than the last one ends on a
441 : : * non-MAXALIGN'd boundary, we have to combine the tail end of its
442 : : * data with data from one or more following chunks until we either
443 : : * reach the last chunk or accumulate a number of bytes which is
444 : : * MAXALIGN'd.
445 : : */
446 [ + + ]: 1219763 : if (which_iov + 1 < iovcnt &&
447 [ + - ]: 1494 : offset + MAXIMUM_ALIGNOF > iov[which_iov].len)
448 : 1494 : {
449 : : char tmpbuf[MAXIMUM_ALIGNOF];
3949 bruce@momjian.us 450 : 1494 : int j = 0;
451 : :
452 : : for (;;)
453 : : {
4176 rhaas@postgresql.org 454 [ + + ]: 4536 : if (offset < iov[which_iov].len)
455 : : {
456 : 1557 : tmpbuf[j] = iov[which_iov].data[offset];
457 : 1557 : j++;
458 : 1557 : offset++;
459 [ + + ]: 1557 : if (j == MAXIMUM_ALIGNOF)
460 : 9 : break;
461 : : }
462 : : else
463 : : {
464 : 2979 : offset -= iov[which_iov].len;
465 : 2979 : which_iov++;
466 [ + + ]: 2979 : if (which_iov >= iovcnt)
467 : 1485 : break;
468 : : }
469 : : }
470 : :
471 : 1494 : res = shm_mq_send_bytes(mqh, j, tmpbuf, nowait, &bytes_written);
472 : :
3569 473 [ - + ]: 1494 : if (res == SHM_MQ_DETACHED)
474 : : {
475 : : /* Reset state in case caller tries to send another message. */
3569 rhaas@postgresql.org 476 :UBC 0 : mqh->mqh_partial_bytes = 0;
477 : 0 : mqh->mqh_length_word_complete = false;
478 : 0 : return res;
479 : : }
480 : :
4176 rhaas@postgresql.org 481 :CBC 1494 : mqh->mqh_partial_bytes += bytes_written;
482 [ - + ]: 1494 : if (res != SHM_MQ_SUCCESS)
4176 rhaas@postgresql.org 483 :UBC 0 : return res;
4176 rhaas@postgresql.org 484 :CBC 1494 : continue;
485 : : }
486 : :
487 : : /*
488 : : * If this is the last chunk, we can write all the data, even if it
489 : : * isn't a multiple of MAXIMUM_ALIGNOF. Otherwise, we need to
490 : : * MAXALIGN_DOWN the write size.
491 : : */
492 : 1218269 : chunksize = iov[which_iov].len - offset;
493 [ - + ]: 1218269 : if (which_iov + 1 < iovcnt)
4176 rhaas@postgresql.org 494 :UBC 0 : chunksize = MAXALIGN_DOWN(chunksize);
4176 rhaas@postgresql.org 495 :CBC 1218269 : res = shm_mq_send_bytes(mqh, chunksize, &iov[which_iov].data[offset],
496 : : nowait, &bytes_written);
497 : :
3569 498 [ - + ]: 1218269 : if (res == SHM_MQ_DETACHED)
499 : : {
500 : : /* Reset state in case caller tries to send another message. */
3569 rhaas@postgresql.org 501 :UBC 0 : mqh->mqh_length_word_complete = false;
502 : 0 : mqh->mqh_partial_bytes = 0;
503 : 0 : return res;
504 : : }
505 : :
4176 rhaas@postgresql.org 506 :CBC 1218269 : mqh->mqh_partial_bytes += bytes_written;
507 : 1218269 : offset += bytes_written;
508 [ + + ]: 1218269 : if (res != SHM_MQ_SUCCESS)
509 : 3908 : return res;
510 [ + + ]: 1215859 : } while (mqh->mqh_partial_bytes < nbytes);
511 : :
512 : : /* Reset for next message. */
513 : 1219846 : mqh->mqh_partial_bytes = 0;
514 : 1219846 : mqh->mqh_length_word_complete = false;
515 : :
516 : : /* If queue has been detached, let caller know. */
2935 517 [ - + ]: 1219846 : if (mq->mq_detached)
2935 rhaas@postgresql.org 518 :UBC 0 : return SHM_MQ_DETACHED;
519 : :
520 : : /*
521 : : * If the counterparty is known to have attached, we can read mq_receiver
522 : : * without acquiring the spinlock. Otherwise, more caution is needed.
523 : : */
2935 rhaas@postgresql.org 524 [ + + ]:CBC 1219846 : if (mqh->mqh_counterparty_attached)
525 : 1217241 : receiver = mq->mq_receiver;
526 : : else
527 : : {
528 [ - + ]: 2605 : SpinLockAcquire(&mq->mq_mutex);
529 : 2605 : receiver = mq->mq_receiver;
530 : 2605 : SpinLockRelease(&mq->mq_mutex);
1391 531 [ + - ]: 2605 : if (receiver != NULL)
532 : 2605 : mqh->mqh_counterparty_attached = true;
533 : : }
534 : :
535 : : /*
536 : : * If the caller has requested force flush or we have written more than
537 : : * 1/4 of the ring size, mark it as written in shared memory and notify
538 : : * the receiver.
539 : : */
1613 540 [ + + + + ]: 1219846 : if (force_flush || mqh->mqh_send_pending > (mq->mq_ring_size >> 2))
541 : : {
542 : 121112 : shm_mq_inc_bytes_written(mq, mqh->mqh_send_pending);
1391 543 [ + - ]: 121112 : if (receiver != NULL)
544 : 121112 : SetLatch(&receiver->procLatch);
1613 545 : 121112 : mqh->mqh_send_pending = 0;
546 : : }
547 : :
2935 548 : 1219846 : return SHM_MQ_SUCCESS;
549 : : }
550 : :
551 : : /*
552 : : * Receive a message from a shared message queue.
553 : : *
554 : : * We set *nbytes to the message length and *data to point to the message
555 : : * payload. If the entire message exists in the queue as a single,
556 : : * contiguous chunk, *data will point directly into shared memory; otherwise,
557 : : * it will point to a temporary buffer. This mostly avoids data copying in
558 : : * the hoped-for case where messages are short compared to the buffer size,
559 : : * while still allowing longer messages. In either case, the return value
560 : : * remains valid until the next receive operation is performed on the queue.
561 : : *
562 : : * When nowait = false, we'll wait on our process latch when the ring buffer
563 : : * is empty and we have not yet received a full message. The sender will
564 : : * set our process latch after more data has been written, and we'll resume
565 : : * processing. Each call will therefore return a complete message
566 : : * (unless the sender detaches the queue).
567 : : *
568 : : * When nowait = true, we do not manipulate the state of the process latch;
569 : : * instead, whenever the buffer is empty and we need to read from it, we
570 : : * return SHM_MQ_WOULD_BLOCK. In this case, the caller should call this
571 : : * function again after the process latch has been set.
572 : : */
573 : : shm_mq_result
4380 574 : 2691174 : shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
575 : : {
4331 bruce@momjian.us 576 : 2691174 : shm_mq *mq = mqh->mqh_queue;
577 : : shm_mq_result res;
578 : 2691174 : Size rb = 0;
579 : : Size nbytes;
580 : : void *rawdata;
581 : :
4443 rhaas@postgresql.org 582 [ - + ]: 2691174 : Assert(mq->mq_receiver == MyProc);
583 : :
584 : : /* We can't receive data until the sender has attached. */
585 [ + + ]: 2691174 : if (!mqh->mqh_counterparty_attached)
586 : : {
587 [ + + ]: 1153753 : if (nowait)
588 : : {
589 : : int counterparty_gone;
590 : :
591 : : /*
592 : : * We shouldn't return at this point at all unless the sender
593 : : * hasn't attached yet. However, the correct return value depends
594 : : * on whether the sender is still attached. If we first test
595 : : * whether the sender has ever attached and then test whether the
596 : : * sender has detached, there's a race condition: a sender that
597 : : * attaches and detaches very quickly might fool us into thinking
598 : : * the sender never attached at all. So, test whether our
599 : : * counterparty is definitively gone first, and only afterwards
600 : : * check whether the sender ever attached in the first place.
601 : : */
3785 602 : 1153689 : counterparty_gone = shm_mq_counterparty_gone(mq, mqh->mqh_handle);
4443 603 [ + + ]: 1153689 : if (shm_mq_get_sender(mq) == NULL)
604 : : {
3785 605 [ - + ]: 1150889 : if (counterparty_gone)
3797 rhaas@postgresql.org 606 :UBC 0 : return SHM_MQ_DETACHED;
607 : : else
3785 rhaas@postgresql.org 608 :CBC 1150889 : return SHM_MQ_WOULD_BLOCK;
609 : : }
610 : : }
4337 611 [ + + ]: 64 : else if (!shm_mq_wait_internal(mq, &mq->mq_sender, mqh->mqh_handle)
612 [ - + ]: 24 : && shm_mq_get_sender(mq) == NULL)
613 : : {
4443 rhaas@postgresql.org 614 :UBC 0 : mq->mq_detached = true;
615 : 0 : return SHM_MQ_DETACHED;
616 : : }
4443 rhaas@postgresql.org 617 :CBC 2864 : mqh->mqh_counterparty_attached = true;
618 : : }
619 : :
620 : : /*
621 : : * If we've consumed an amount of data greater than 1/4th of the ring
622 : : * size, mark it consumed in shared memory. We try to avoid doing this
623 : : * unnecessarily when only a small amount of data has been consumed,
624 : : * because SetLatch() is fairly expensive and we don't want to do it too
625 : : * often.
626 : : */
2935 627 [ + + ]: 1540285 : if (mqh->mqh_consume_pending > mq->mq_ring_size / 4)
628 : : {
4443 629 : 19130 : shm_mq_inc_bytes_read(mq, mqh->mqh_consume_pending);
630 : 19130 : mqh->mqh_consume_pending = 0;
631 : : }
632 : :
633 : : /* Try to read, or finish reading, the length word from the buffer. */
4380 634 [ + + ]: 1562081 : while (!mqh->mqh_length_word_complete)
635 : : {
636 : : /* Try to receive the message length word. */
637 [ - + ]: 1534152 : Assert(mqh->mqh_partial_bytes < sizeof(Size));
2935 638 : 1534152 : res = shm_mq_receive_bytes(mqh, sizeof(Size) - mqh->mqh_partial_bytes,
639 : : nowait, &rb, &rawdata);
4443 640 [ + + ]: 1534152 : if (res != SHM_MQ_SUCCESS)
641 : 320777 : return res;
642 : :
643 : : /*
644 : : * Hopefully, we'll receive the entire message length word at once.
645 : : * But if sizeof(Size) > MAXIMUM_ALIGNOF, then it might be split over
646 : : * multiple reads.
647 : : */
4380 648 [ + - + - ]: 1213375 : if (mqh->mqh_partial_bytes == 0 && rb >= sizeof(Size))
4443 649 : 21796 : {
650 : : Size needed;
651 : :
4331 bruce@momjian.us 652 : 1213375 : nbytes = *(Size *) rawdata;
653 : :
654 : : /* If we've already got the whole message, we're done. */
4380 rhaas@postgresql.org 655 : 1213375 : needed = MAXALIGN(sizeof(Size)) + MAXALIGN(nbytes);
656 [ + + ]: 1213375 : if (rb >= needed)
657 : : {
2935 658 : 1191579 : mqh->mqh_consume_pending += needed;
4380 659 : 1191579 : *nbytesp = nbytes;
660 : 1191579 : *datap = ((char *) rawdata) + MAXALIGN(sizeof(Size));
661 : 1191579 : return SHM_MQ_SUCCESS;
662 : : }
663 : :
664 : : /*
665 : : * We don't have the whole message, but we at least have the whole
666 : : * length word.
667 : : */
668 : 21796 : mqh->mqh_expected_bytes = nbytes;
669 : 21796 : mqh->mqh_length_word_complete = true;
2935 670 : 21796 : mqh->mqh_consume_pending += MAXALIGN(sizeof(Size));
4380 671 : 21796 : rb -= MAXALIGN(sizeof(Size));
672 : : }
673 : : else
674 : : {
675 : : Size lengthbytes;
676 : :
677 : : /* Can't be split unless bigger than required alignment. */
4380 rhaas@postgresql.org 678 :UBC 0 : Assert(sizeof(Size) > MAXIMUM_ALIGNOF);
679 : :
680 : : /* Message word is split; need buffer to reassemble. */
681 : : if (mqh->mqh_buffer == NULL)
682 : : {
683 : : mqh->mqh_buffer = MemoryContextAlloc(mqh->mqh_context,
684 : : MQH_INITIAL_BUFSIZE);
685 : : mqh->mqh_buflen = MQH_INITIAL_BUFSIZE;
686 : : }
687 : : Assert(mqh->mqh_buflen >= sizeof(Size));
688 : :
689 : : /* Copy partial length word; remember to consume it. */
690 : : if (mqh->mqh_partial_bytes + rb > sizeof(Size))
691 : : lengthbytes = sizeof(Size) - mqh->mqh_partial_bytes;
692 : : else
693 : : lengthbytes = rb;
694 : : memcpy(&mqh->mqh_buffer[mqh->mqh_partial_bytes], rawdata,
695 : : lengthbytes);
696 : : mqh->mqh_partial_bytes += lengthbytes;
697 : : mqh->mqh_consume_pending += MAXALIGN(lengthbytes);
698 : : rb -= lengthbytes;
699 : :
700 : : /* If we now have the whole word, we're ready to read payload. */
701 : : if (mqh->mqh_partial_bytes >= sizeof(Size))
702 : : {
703 : : Assert(mqh->mqh_partial_bytes == sizeof(Size));
704 : : mqh->mqh_expected_bytes = *(Size *) mqh->mqh_buffer;
705 : : mqh->mqh_length_word_complete = true;
706 : : mqh->mqh_partial_bytes = 0;
707 : : }
708 : : }
709 : : }
4380 rhaas@postgresql.org 710 :CBC 27929 : nbytes = mqh->mqh_expected_bytes;
711 : :
712 : : /*
713 : : * Should be disallowed on the sending side already, but better check and
714 : : * error out on the receiver side as well rather than trying to read a
715 : : * prohibitively large message.
716 : : */
1973 peter@eisentraut.org 717 [ - + ]: 27929 : if (nbytes > MaxAllocSize)
1973 peter@eisentraut.org 718 [ # # ]:UBC 0 : ereport(ERROR,
719 : : (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
720 : : errmsg("invalid message size %zu in shared memory queue",
721 : : nbytes)));
722 : :
4380 rhaas@postgresql.org 723 [ + + ]:CBC 27929 : if (mqh->mqh_partial_bytes == 0)
724 : : {
725 : : /*
726 : : * Try to obtain the whole message in a single chunk. If this works,
727 : : * we need not copy the data and can return a pointer directly into
728 : : * shared memory.
729 : : */
2935 730 : 24212 : res = shm_mq_receive_bytes(mqh, nbytes, nowait, &rb, &rawdata);
4443 731 [ + + ]: 24212 : if (res != SHM_MQ_SUCCESS)
732 : 2416 : return res;
733 [ + + ]: 21796 : if (rb >= nbytes)
734 : : {
4380 735 : 228 : mqh->mqh_length_word_complete = false;
2935 736 : 228 : mqh->mqh_consume_pending += MAXALIGN(nbytes);
4443 737 : 228 : *nbytesp = nbytes;
738 : 228 : *datap = rawdata;
739 : 228 : return SHM_MQ_SUCCESS;
740 : : }
741 : :
742 : : /*
743 : : * The message has wrapped the buffer. We'll need to copy it in order
744 : : * to return it to the client in one chunk. First, make sure we have
745 : : * a large enough buffer available.
746 : : */
747 [ + + ]: 21568 : if (mqh->mqh_buflen < nbytes)
748 : : {
749 : : Size newbuflen;
750 : :
751 : : /*
752 : : * Increase size to the next power of 2 that's >= nbytes, but
753 : : * limit to MaxAllocSize.
754 : : */
1694 tgl@sss.pgh.pa.us 755 : 143 : newbuflen = pg_nextpower2_size_t(nbytes);
1973 peter@eisentraut.org 756 : 143 : newbuflen = Min(newbuflen, MaxAllocSize);
757 : :
4443 rhaas@postgresql.org 758 [ - + ]: 143 : if (mqh->mqh_buffer != NULL)
759 : : {
4443 rhaas@postgresql.org 760 :UBC 0 : pfree(mqh->mqh_buffer);
761 : 0 : mqh->mqh_buffer = NULL;
762 : 0 : mqh->mqh_buflen = 0;
763 : : }
4443 rhaas@postgresql.org 764 :CBC 143 : mqh->mqh_buffer = MemoryContextAlloc(mqh->mqh_context, newbuflen);
765 : 143 : mqh->mqh_buflen = newbuflen;
766 : : }
767 : : }
768 : :
769 : : /* Loop until we've copied the entire message. */
770 : : for (;;)
771 : 73996 : {
772 : : Size still_needed;
773 : :
774 : : /* Copy as much as we can. */
4380 775 [ - + ]: 99281 : Assert(mqh->mqh_partial_bytes + rb <= nbytes);
1473 tgl@sss.pgh.pa.us 776 [ + + ]: 99281 : if (rb > 0)
777 : : {
778 : 95564 : memcpy(&mqh->mqh_buffer[mqh->mqh_partial_bytes], rawdata, rb);
779 : 95564 : mqh->mqh_partial_bytes += rb;
780 : : }
781 : :
782 : : /*
783 : : * Update count of bytes that can be consumed, accounting for
784 : : * alignment padding. Note that this will never actually insert any
785 : : * padding except at the end of a message, because the buffer size is
786 : : * a multiple of MAXIMUM_ALIGNOF, and each read and write is as well.
787 : : */
4380 rhaas@postgresql.org 788 [ + + - + ]: 99281 : Assert(mqh->mqh_partial_bytes == nbytes || rb == MAXALIGN(rb));
2935 789 : 99281 : mqh->mqh_consume_pending += MAXALIGN(rb);
790 : :
791 : : /* If we got all the data, exit the loop. */
4380 792 [ + + ]: 99281 : if (mqh->mqh_partial_bytes >= nbytes)
4443 793 : 21568 : break;
794 : :
795 : : /* Wait for some more data. */
4380 796 : 77713 : still_needed = nbytes - mqh->mqh_partial_bytes;
2935 797 : 77713 : res = shm_mq_receive_bytes(mqh, still_needed, nowait, &rb, &rawdata);
4443 798 [ + + ]: 77713 : if (res != SHM_MQ_SUCCESS)
799 : 3717 : return res;
800 [ + + ]: 73996 : if (rb > still_needed)
801 : 20889 : rb = still_needed;
802 : : }
803 : :
804 : : /* Return the complete message, and reset for next message. */
805 : 21568 : *nbytesp = nbytes;
806 : 21568 : *datap = mqh->mqh_buffer;
4380 807 : 21568 : mqh->mqh_length_word_complete = false;
808 : 21568 : mqh->mqh_partial_bytes = 0;
4443 809 : 21568 : return SHM_MQ_SUCCESS;
810 : : }
811 : :
812 : : /*
813 : : * Wait for the other process that's supposed to use this queue to attach
814 : : * to it.
815 : : *
816 : : * The return value is SHM_MQ_DETACHED if the worker has already detached or
817 : : * if it dies; it is SHM_MQ_SUCCESS if we detect that the worker has attached.
818 : : * Note that we will only be able to detect that the worker has died before
819 : : * attaching if a background worker handle was passed to shm_mq_attach().
820 : : */
821 : : shm_mq_result
4443 rhaas@postgresql.org 822 :UBC 0 : shm_mq_wait_for_attach(shm_mq_handle *mqh)
823 : : {
824 : 0 : shm_mq *mq = mqh->mqh_queue;
825 : : PGPROC **victim;
826 : :
827 [ # # ]: 0 : if (shm_mq_get_receiver(mq) == MyProc)
828 : 0 : victim = &mq->mq_sender;
829 : : else
830 : : {
831 [ # # ]: 0 : Assert(shm_mq_get_sender(mq) == MyProc);
832 : 0 : victim = &mq->mq_receiver;
833 : : }
834 : :
835 [ # # ]: 0 : if (shm_mq_wait_internal(mq, victim, mqh->mqh_handle))
836 : 0 : return SHM_MQ_SUCCESS;
837 : : else
838 : 0 : return SHM_MQ_DETACHED;
839 : : }
840 : :
841 : : /*
842 : : * Detach from a shared message queue, and destroy the shm_mq_handle.
843 : : */
844 : : void
3118 tgl@sss.pgh.pa.us 845 :CBC 4229 : shm_mq_detach(shm_mq_handle *mqh)
846 : : {
847 : : /* Before detaching, notify the receiver about any already-written data. */
1613 rhaas@postgresql.org 848 [ + + ]: 4229 : if (mqh->mqh_send_pending > 0)
849 : : {
850 : 1096 : shm_mq_inc_bytes_written(mqh->mqh_queue, mqh->mqh_send_pending);
851 : 1096 : mqh->mqh_send_pending = 0;
852 : : }
853 : :
854 : : /* Notify counterparty that we're outta here. */
3118 tgl@sss.pgh.pa.us 855 : 4229 : shm_mq_detach_internal(mqh->mqh_queue);
856 : :
857 : : /* Cancel on_dsm_detach callback, if any. */
858 [ + - ]: 4229 : if (mqh->mqh_segment)
859 : 4229 : cancel_on_dsm_detach(mqh->mqh_segment,
860 : : shm_mq_detach_callback,
861 : 4229 : PointerGetDatum(mqh->mqh_queue));
862 : :
863 : : /* Release local memory associated with handle. */
864 [ + + ]: 4229 : if (mqh->mqh_buffer != NULL)
865 : 135 : pfree(mqh->mqh_buffer);
866 : 4229 : pfree(mqh);
867 : 4229 : }
868 : :
869 : : /*
870 : : * Notify counterparty that we're detaching from shared message queue.
871 : : *
872 : : * The purpose of this function is to make sure that the process
873 : : * with which we're communicating doesn't block forever waiting for us to
874 : : * fill or drain the queue once we've lost interest. When the sender
875 : : * detaches, the receiver can read any messages remaining in the queue;
876 : : * further reads will return SHM_MQ_DETACHED. If the receiver detaches,
877 : : * further attempts to send messages will likewise return SHM_MQ_DETACHED.
878 : : *
879 : : * This is separated out from shm_mq_detach() because if the on_dsm_detach
880 : : * callback fires, we only want to do this much. We do not try to touch
881 : : * the local shm_mq_handle, as it may have been pfree'd already.
882 : : */
883 : : static void
884 : 5828 : shm_mq_detach_internal(shm_mq *mq)
885 : : {
886 : : PGPROC *victim;
887 : :
4443 rhaas@postgresql.org 888 [ - + ]: 5828 : SpinLockAcquire(&mq->mq_mutex);
2936 andres@anarazel.de 889 [ + + ]: 5828 : if (mq->mq_sender == MyProc)
890 : 2872 : victim = mq->mq_receiver;
891 : : else
892 : : {
893 [ - + ]: 2956 : Assert(mq->mq_receiver == MyProc);
894 : 2956 : victim = mq->mq_sender;
895 : : }
896 : 5828 : mq->mq_detached = true;
4443 rhaas@postgresql.org 897 : 5828 : SpinLockRelease(&mq->mq_mutex);
898 : :
899 [ + + ]: 5828 : if (victim != NULL)
900 : 5747 : SetLatch(&victim->procLatch);
901 : 5828 : }
902 : :
903 : : /*
904 : : * Get the shm_mq from handle.
905 : : */
906 : : shm_mq *
3831 907 : 2172747 : shm_mq_get_queue(shm_mq_handle *mqh)
908 : : {
909 : 2172747 : return mqh->mqh_queue;
910 : : }
911 : :
912 : : /*
913 : : * Write bytes into a shared message queue.
914 : : */
915 : : static shm_mq_result
4176 916 : 2439613 : shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data,
917 : : bool nowait, Size *bytes_written)
918 : : {
4443 919 : 2439613 : shm_mq *mq = mqh->mqh_queue;
4380 920 : 2439613 : Size sent = 0;
921 : : uint64 used;
922 : 2439613 : Size ringsize = mq->mq_ring_size;
923 : : Size available;
924 : :
4443 925 [ + + ]: 5040364 : while (sent < nbytes)
926 : : {
927 : : uint64 rb;
928 : : uint64 wb;
929 : :
930 : : /* Compute number of ring buffer bytes used and available. */
2935 931 : 2604663 : rb = pg_atomic_read_u64(&mq->mq_bytes_read);
1613 932 : 2604663 : wb = pg_atomic_read_u64(&mq->mq_bytes_written) + mqh->mqh_send_pending;
2935 933 [ - + ]: 2604663 : Assert(wb >= rb);
934 : 2604663 : used = wb - rb;
4443 935 [ - + ]: 2604663 : Assert(used <= ringsize);
936 : 2604663 : available = Min(ringsize - used, nbytes - sent);
937 : :
938 : : /*
939 : : * Bail out if the queue has been detached. Note that we would be in
940 : : * trouble if the compiler decided to cache the value of
941 : : * mq->mq_detached in a register or on the stack across loop
942 : : * iterations. It probably shouldn't do that anyway since we'll
943 : : * always return, call an external function that performs a system
944 : : * call, or reach a memory barrier at some point later in the loop,
945 : : * but just to be sure, insert a compiler barrier here.
946 : : */
2935 947 : 2604663 : pg_compiler_barrier();
948 [ + + ]: 2604663 : if (mq->mq_detached)
949 : : {
4252 950 : 4 : *bytes_written = sent;
4443 951 : 4 : return SHM_MQ_DETACHED;
952 : : }
953 : :
3873 954 [ + + + + ]: 2604659 : if (available == 0 && !mqh->mqh_counterparty_attached)
955 : : {
956 : : /*
957 : : * The queue is full, so if the receiver isn't yet known to be
958 : : * attached, we must wait for that to happen.
959 : : */
960 [ + + ]: 9 : if (nowait)
961 : : {
3797 962 [ - + ]: 4 : if (shm_mq_counterparty_gone(mq, mqh->mqh_handle))
963 : : {
3797 rhaas@postgresql.org 964 :UBC 0 : *bytes_written = sent;
965 : 0 : return SHM_MQ_DETACHED;
966 : : }
3873 rhaas@postgresql.org 967 [ - + ]:CBC 4 : if (shm_mq_get_receiver(mq) == NULL)
968 : : {
4252 rhaas@postgresql.org 969 :UBC 0 : *bytes_written = sent;
3873 970 : 0 : return SHM_MQ_WOULD_BLOCK;
971 : : }
972 : : }
3873 rhaas@postgresql.org 973 [ - + ]:CBC 5 : else if (!shm_mq_wait_internal(mq, &mq->mq_receiver,
974 : : mqh->mqh_handle))
975 : : {
3873 rhaas@postgresql.org 976 :UBC 0 : mq->mq_detached = true;
977 : 0 : *bytes_written = sent;
978 : 0 : return SHM_MQ_DETACHED;
979 : : }
3873 rhaas@postgresql.org 980 :CBC 9 : mqh->mqh_counterparty_attached = true;
981 : :
982 : : /*
983 : : * The receiver may have read some data after attaching, so we
984 : : * must not wait without rechecking the queue state.
985 : : */
986 : : }
987 [ + + ]: 2604650 : else if (available == 0)
988 : : {
989 : : /* Update the pending send bytes in the shared memory. */
1613 990 : 80681 : shm_mq_inc_bytes_written(mq, mqh->mqh_send_pending);
991 : :
992 : : /*
993 : : * Since mq->mqh_counterparty_attached is known to be true at this
994 : : * point, mq_receiver has been set, and it can't change once set.
995 : : * Therefore, we can read it without acquiring the spinlock.
996 : : */
2935 997 [ - + ]: 80681 : Assert(mqh->mqh_counterparty_attached);
998 : 80681 : SetLatch(&mq->mq_receiver->procLatch);
999 : :
1000 : : /*
1001 : : * We have just updated the mqh_send_pending bytes in the shared
1002 : : * memory so reset it.
1003 : : */
1613 1004 : 80681 : mqh->mqh_send_pending = 0;
1005 : :
1006 : : /* Skip manipulation of our latch if nowait = true. */
4443 1007 [ + + ]: 80681 : if (nowait)
1008 : : {
1009 : 3908 : *bytes_written = sent;
1010 : 3908 : return SHM_MQ_WOULD_BLOCK;
1011 : : }
1012 : :
1013 : : /*
1014 : : * Wait for our latch to be set. It might already be set for some
1015 : : * unrelated reason, but that'll just result in one extra trip
1016 : : * through the loop. It's worth it to avoid resetting the latch
1017 : : * at top of loop, because setting an already-set latch is much
1018 : : * cheaper than setting one that has been reset.
1019 : : */
2669 tmunro@postgresql.or 1020 : 76773 : (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0,
1021 : : WAIT_EVENT_MESSAGE_QUEUE_SEND);
1022 : :
1023 : : /* Reset the latch so we don't spin. */
4078 andres@anarazel.de 1024 : 76773 : ResetLatch(MyLatch);
1025 : :
1026 : : /* An interrupt may have occurred while we were waiting. */
3513 tgl@sss.pgh.pa.us 1027 [ - + ]: 76773 : CHECK_FOR_INTERRUPTS();
1028 : : }
1029 : : else
1030 : : {
1031 : : Size offset;
1032 : : Size sendnow;
1033 : :
2935 rhaas@postgresql.org 1034 : 2523969 : offset = wb % (uint64) ringsize;
1035 : 2523969 : sendnow = Min(available, ringsize - offset);
1036 : :
1037 : : /*
1038 : : * Write as much data as we can via a single memcpy(). Make sure
1039 : : * these writes happen after the read of mq_bytes_read, above.
1040 : : * This barrier pairs with the one in shm_mq_inc_bytes_read.
1041 : : * (Since we're separating the read of mq_bytes_read from a
1042 : : * subsequent write to mq_ring, we need a full barrier here.)
1043 : : */
1044 : 2523969 : pg_memory_barrier();
4443 1045 : 2523969 : memcpy(&mq->mq_ring[mq->mq_ring_offset + offset],
1046 : : (const char *) data + sent, sendnow);
1047 : 2523969 : sent += sendnow;
1048 : :
1049 : : /*
1050 : : * Update count of bytes written, with alignment padding. Note
1051 : : * that this will never actually insert any padding except at the
1052 : : * end of a run of bytes, because the buffer size is a multiple of
1053 : : * MAXIMUM_ALIGNOF, and each read is as well.
1054 : : */
4380 1055 [ + + - + ]: 2523969 : Assert(sent == nbytes || sendnow == MAXALIGN(sendnow));
1056 : :
1057 : : /*
1058 : : * For efficiency, we don't update the bytes written in the shared
1059 : : * memory and also don't set the reader's latch here. Refer to
1060 : : * the comments atop the shm_mq_handle structure for more
1061 : : * information.
1062 : : */
1613 1063 : 2523969 : mqh->mqh_send_pending += MAXALIGN(sendnow);
1064 : : }
1065 : : }
1066 : :
4443 1067 : 2435701 : *bytes_written = sent;
1068 : 2435701 : return SHM_MQ_SUCCESS;
1069 : : }
1070 : :
1071 : : /*
1072 : : * Wait until at least *nbytesp bytes are available to be read from the
1073 : : * shared message queue, or until the buffer wraps around. If the queue is
1074 : : * detached, returns SHM_MQ_DETACHED. If nowait is specified and a wait
1075 : : * would be required, returns SHM_MQ_WOULD_BLOCK. Otherwise, *datap is set
1076 : : * to the location at which data bytes can be read, *nbytesp is set to the
1077 : : * number of bytes which can be read at that address, and the return value
1078 : : * is SHM_MQ_SUCCESS.
1079 : : */
1080 : : static shm_mq_result
2935 1081 : 1636077 : shm_mq_receive_bytes(shm_mq_handle *mqh, Size bytes_needed, bool nowait,
1082 : : Size *nbytesp, void **datap)
1083 : : {
1084 : 1636077 : shm_mq *mq = mqh->mqh_queue;
4380 1085 : 1636077 : Size ringsize = mq->mq_ring_size;
1086 : : uint64 used;
1087 : : uint64 written;
1088 : :
1089 : : for (;;)
4443 1090 : 112397 : {
1091 : : Size offset;
1092 : : uint64 read;
1093 : :
1094 : : /* Get bytes written, so we can compute what's available to read. */
2935 1095 : 1748474 : written = pg_atomic_read_u64(&mq->mq_bytes_written);
1096 : :
1097 : : /*
1098 : : * Get bytes read. Include bytes we could consume but have not yet
1099 : : * consumed.
1100 : : */
1101 : 1748474 : read = pg_atomic_read_u64(&mq->mq_bytes_read) +
1102 : 1748474 : mqh->mqh_consume_pending;
1103 : 1748474 : used = written - read;
4443 1104 [ - + ]: 1748474 : Assert(used <= ringsize);
2935 1105 : 1748474 : offset = read % (uint64) ringsize;
1106 : :
1107 : : /* If we have enough data or buffer has wrapped, we're done. */
4443 1108 [ + + + + ]: 1748474 : if (used >= bytes_needed || offset + used >= ringsize)
1109 : : {
1110 : 1309167 : *nbytesp = Min(used, ringsize - offset);
1111 : 1309167 : *datap = &mq->mq_ring[mq->mq_ring_offset + offset];
1112 : :
1113 : : /*
1114 : : * Separate the read of mq_bytes_written, above, from caller's
1115 : : * attempt to read the data itself. Pairs with the barrier in
1116 : : * shm_mq_inc_bytes_written.
1117 : : */
2935 1118 : 1309167 : pg_read_barrier();
4443 1119 : 1309167 : return SHM_MQ_SUCCESS;
1120 : : }
1121 : :
1122 : : /*
1123 : : * Fall out before waiting if the queue has been detached.
1124 : : *
1125 : : * Note that we don't check for this until *after* considering whether
1126 : : * the data already available is enough, since the receiver can finish
1127 : : * receiving a message stored in the buffer even after the sender has
1128 : : * detached.
1129 : : */
2935 1130 [ + + ]: 439307 : if (mq->mq_detached)
1131 : : {
1132 : : /*
1133 : : * If the writer advanced mq_bytes_written and then set
1134 : : * mq_detached, we might not have read the final value of
1135 : : * mq_bytes_written above. Insert a read barrier and then check
1136 : : * again if mq_bytes_written has advanced.
1137 : : */
2932 1138 : 1344 : pg_read_barrier();
1139 [ - + ]: 1344 : if (written != pg_atomic_read_u64(&mq->mq_bytes_written))
2932 rhaas@postgresql.org 1140 :UBC 0 : continue;
1141 : :
4443 rhaas@postgresql.org 1142 :CBC 1344 : return SHM_MQ_DETACHED;
1143 : : }
1144 : :
1145 : : /*
1146 : : * We didn't get enough data to satisfy the request, so mark any data
1147 : : * previously-consumed as read to make more buffer space.
1148 : : */
2935 1149 [ + + ]: 437963 : if (mqh->mqh_consume_pending > 0)
1150 : : {
1151 : 96795 : shm_mq_inc_bytes_read(mq, mqh->mqh_consume_pending);
1152 : 96795 : mqh->mqh_consume_pending = 0;
1153 : : }
1154 : :
1155 : : /* Skip manipulation of our latch if nowait = true. */
4443 1156 [ + + ]: 437963 : if (nowait)
1157 : 325566 : return SHM_MQ_WOULD_BLOCK;
1158 : :
1159 : : /*
1160 : : * Wait for our latch to be set. It might already be set for some
1161 : : * unrelated reason, but that'll just result in one extra trip through
1162 : : * the loop. It's worth it to avoid resetting the latch at top of
1163 : : * loop, because setting an already-set latch is much cheaper than
1164 : : * setting one that has been reset.
1165 : : */
2669 tmunro@postgresql.or 1166 : 112397 : (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0,
1167 : : WAIT_EVENT_MESSAGE_QUEUE_RECEIVE);
1168 : :
1169 : : /* Reset the latch so we don't spin. */
4078 andres@anarazel.de 1170 : 112397 : ResetLatch(MyLatch);
1171 : :
1172 : : /* An interrupt may have occurred while we were waiting. */
3513 tgl@sss.pgh.pa.us 1173 [ + + ]: 112397 : CHECK_FOR_INTERRUPTS();
1174 : : }
1175 : : }
1176 : :
1177 : : /*
1178 : : * Test whether a counterparty who may not even be alive yet is definitely gone.
1179 : : */
1180 : : static bool
2936 andres@anarazel.de 1181 : 1153693 : shm_mq_counterparty_gone(shm_mq *mq, BackgroundWorkerHandle *handle)
1182 : : {
1183 : : pid_t pid;
1184 : :
1185 : : /* If the queue has been detached, counterparty is definitely gone. */
2935 rhaas@postgresql.org 1186 [ + + ]: 1153693 : if (mq->mq_detached)
3797 1187 : 1087 : return true;
1188 : :
1189 : : /* If there's a handle, check worker status. */
1190 [ + + ]: 1152606 : if (handle != NULL)
1191 : : {
1192 : : BgwHandleStatus status;
1193 : :
1194 : : /* Check for unexpected worker death. */
1195 : 1152587 : status = GetBackgroundWorkerPid(handle, &pid);
1196 [ + + - + ]: 1152587 : if (status != BGWH_STARTED && status != BGWH_NOT_YET_STARTED)
1197 : : {
1198 : : /* Mark it detached, just to make it official. */
3797 rhaas@postgresql.org 1199 :UBC 0 : mq->mq_detached = true;
1200 : 0 : return true;
1201 : : }
1202 : : }
1203 : :
1204 : : /* Counterparty is not definitively gone. */
3797 rhaas@postgresql.org 1205 :CBC 1152606 : return false;
1206 : : }
1207 : :
1208 : : /*
1209 : : * This is used when a process is waiting for its counterpart to attach to the
1210 : : * queue. We exit when the other process attaches as expected, or, if
1211 : : * handle != NULL, when the referenced background process or the postmaster
1212 : : * dies. Note that if handle == NULL, and the process fails to attach, we'll
1213 : : * potentially get stuck here forever waiting for a process that may never
1214 : : * start. We do check for interrupts, though.
1215 : : *
1216 : : * ptr is a pointer to the memory address that we're expecting to become
1217 : : * non-NULL when our counterpart attaches to the queue.
1218 : : */
1219 : : static bool
2936 andres@anarazel.de 1220 : 69 : shm_mq_wait_internal(shm_mq *mq, PGPROC **ptr, BackgroundWorkerHandle *handle)
1221 : : {
4331 bruce@momjian.us 1222 : 69 : bool result = false;
1223 : :
1224 : : for (;;)
4443 rhaas@postgresql.org 1225 : 100 : {
1226 : : BgwHandleStatus status;
1227 : : pid_t pid;
1228 : :
1229 : : /* Acquire the lock just long enough to check the pointer. */
3810 1230 [ - + ]: 169 : SpinLockAcquire(&mq->mq_mutex);
1231 : 169 : result = (*ptr != NULL);
1232 : 169 : SpinLockRelease(&mq->mq_mutex);
1233 : :
1234 : : /* Fail if detached; else succeed if initialized. */
2935 1235 [ + + ]: 169 : if (mq->mq_detached)
1236 : : {
3810 1237 : 24 : result = false;
1238 : 24 : break;
1239 : : }
1240 [ + + ]: 145 : if (result)
1241 : 45 : break;
1242 : :
1243 [ + - ]: 100 : if (handle != NULL)
1244 : : {
1245 : : /* Check for unexpected worker death. */
1246 : 100 : status = GetBackgroundWorkerPid(handle, &pid);
1247 [ + + - + ]: 100 : if (status != BGWH_STARTED && status != BGWH_NOT_YET_STARTED)
1248 : : {
4443 rhaas@postgresql.org 1249 :UBC 0 : result = false;
1250 : 0 : break;
1251 : : }
1252 : : }
1253 : :
1254 : : /* Wait to be signaled. */
2669 tmunro@postgresql.or 1255 :CBC 100 : (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0,
1256 : : WAIT_EVENT_MESSAGE_QUEUE_INTERNAL);
1257 : :
1258 : : /* Reset the latch so we don't spin. */
3810 rhaas@postgresql.org 1259 : 100 : ResetLatch(MyLatch);
1260 : :
1261 : : /* An interrupt may have occurred while we were waiting. */
3513 tgl@sss.pgh.pa.us 1262 [ + + ]: 100 : CHECK_FOR_INTERRUPTS();
1263 : : }
1264 : :
4443 rhaas@postgresql.org 1265 : 69 : return result;
1266 : : }
1267 : :
1268 : : /*
1269 : : * Increment the number of bytes read.
1270 : : */
1271 : : static void
2936 andres@anarazel.de 1272 : 115925 : shm_mq_inc_bytes_read(shm_mq *mq, Size n)
1273 : : {
1274 : : PGPROC *sender;
1275 : :
1276 : : /*
1277 : : * Separate prior reads of mq_ring from the increment of mq_bytes_read
1278 : : * which follows. This pairs with the full barrier in
1279 : : * shm_mq_send_bytes(). We only need a read barrier here because the
1280 : : * increment of mq_bytes_read is actually a read followed by a dependent
1281 : : * write.
1282 : : */
2935 rhaas@postgresql.org 1283 : 115925 : pg_read_barrier();
1284 : :
1285 : : /*
1286 : : * There's no need to use pg_atomic_fetch_add_u64 here, because nobody
1287 : : * else can be changing this value. This method should be cheaper.
1288 : : */
1289 : 115925 : pg_atomic_write_u64(&mq->mq_bytes_read,
1290 : 115925 : pg_atomic_read_u64(&mq->mq_bytes_read) + n);
1291 : :
1292 : : /*
1293 : : * We shouldn't have any bytes to read without a sender, so we can read
1294 : : * mq_sender here without a lock. Once it's initialized, it can't change.
1295 : : */
4443 1296 : 115925 : sender = mq->mq_sender;
1297 [ - + ]: 115925 : Assert(sender != NULL);
1298 : 115925 : SetLatch(&sender->procLatch);
1299 : 115925 : }
1300 : :
1301 : : /*
1302 : : * Increment the number of bytes written.
1303 : : */
1304 : : static void
2936 andres@anarazel.de 1305 : 202889 : shm_mq_inc_bytes_written(shm_mq *mq, Size n)
1306 : : {
1307 : : /*
1308 : : * Separate prior reads of mq_ring from the write of mq_bytes_written
1309 : : * which we're about to do. Pairs with the read barrier found in
1310 : : * shm_mq_receive_bytes.
1311 : : */
2935 rhaas@postgresql.org 1312 : 202889 : pg_write_barrier();
1313 : :
1314 : : /*
1315 : : * There's no need to use pg_atomic_fetch_add_u64 here, because nobody
1316 : : * else can be changing this value. This method avoids taking the bus
1317 : : * lock unnecessarily.
1318 : : */
1319 : 202889 : pg_atomic_write_u64(&mq->mq_bytes_written,
1320 : 202889 : pg_atomic_read_u64(&mq->mq_bytes_written) + n);
4443 1321 : 202889 : }
1322 : :
1323 : : /* Shim for on_dsm_detach callback. */
1324 : : static void
1325 : 1599 : shm_mq_detach_callback(dsm_segment *seg, Datum arg)
1326 : : {
1327 : 1599 : shm_mq *mq = (shm_mq *) DatumGetPointer(arg);
1328 : :
3118 tgl@sss.pgh.pa.us 1329 : 1599 : shm_mq_detach_internal(mq);
4443 rhaas@postgresql.org 1330 : 1599 : }
|