Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * tqueue.c
4 : : * Use shm_mq to send & receive tuples between parallel backends
5 : : *
6 : : * A DestReceiver of type DestTupleQueue, which is a TQueueDestReceiver
7 : : * under the hood, writes tuples from the executor to a shm_mq.
8 : : *
9 : : * A TupleQueueReader reads tuples from a shm_mq and returns the tuples.
10 : : *
11 : : * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
12 : : * Portions Copyright (c) 1994, Regents of the University of California
13 : : *
14 : : * IDENTIFICATION
15 : : * src/backend/executor/tqueue.c
16 : : *
17 : : *-------------------------------------------------------------------------
18 : : */
19 : :
20 : : #include "postgres.h"
21 : :
22 : : #include "access/htup_details.h"
23 : : #include "executor/tqueue.h"
24 : :
25 : : /*
26 : : * DestReceiver object's private contents
27 : : *
28 : : * queue is a pointer to data supplied by DestReceiver's caller.
29 : : */
30 : : typedef struct TQueueDestReceiver
31 : : {
32 : : DestReceiver pub; /* public fields */
33 : : shm_mq_handle *queue; /* shm_mq to send to */
34 : : } TQueueDestReceiver;
35 : :
36 : : /*
37 : : * TupleQueueReader object's private contents
38 : : *
39 : : * queue is a pointer to data supplied by reader's caller.
40 : : *
41 : : * "typedef struct TupleQueueReader TupleQueueReader" is in tqueue.h
42 : : */
43 : : struct TupleQueueReader
44 : : {
45 : : shm_mq_handle *queue; /* shm_mq to receive from */
46 : : };
47 : :
48 : : /*
49 : : * Receive a tuple from a query, and send it to the designated shm_mq.
50 : : *
51 : : * Returns true if successful, false if shm_mq has been detached.
52 : : */
53 : : static bool
3641 rhaas@postgresql.org 54 :CBC 1067453 : tqueueReceiveSlot(TupleTableSlot *slot, DestReceiver *self)
55 : : {
56 : 1067453 : TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self;
57 : : MinimalTuple tuple;
58 : : shm_mq_result result;
59 : : bool should_free;
60 : :
61 : : /* Send the tuple itself. */
1877 tmunro@postgresql.or 62 : 1067453 : tuple = ExecFetchSlotMinimalTuple(slot, &should_free);
1423 rhaas@postgresql.org 63 : 1067453 : result = shm_mq_send(tqueue->queue, tuple->t_len, tuple, false, false);
64 : :
2487 andres@anarazel.de 65 [ + + ]: 1067453 : if (should_free)
1877 tmunro@postgresql.or 66 : 1044287 : pfree(tuple);
67 : :
68 : : /* Check for failure. */
3379 rhaas@postgresql.org 69 [ - + ]: 1067453 : if (result == SHM_MQ_DETACHED)
3379 rhaas@postgresql.org 70 :UBC 0 : return false;
3379 rhaas@postgresql.org 71 [ - + ]:CBC 1067453 : else if (result != SHM_MQ_SUCCESS)
3379 rhaas@postgresql.org 72 [ # # ]:UBC 0 : ereport(ERROR,
73 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
74 : : errmsg("could not send tuple to shared-memory queue")));
75 : :
3379 rhaas@postgresql.org 76 :CBC 1067453 : return true;
77 : : }
78 : :
79 : : /*
80 : : * Prepare to receive tuples from executor.
81 : : */
82 : : static void
3641 83 : 1267 : tqueueStartupReceiver(DestReceiver *self, int operation, TupleDesc typeinfo)
84 : : {
85 : : /* do nothing */
86 : 1267 : }
87 : :
88 : : /*
89 : : * Clean up at end of an executor run
90 : : */
91 : : static void
92 : 1261 : tqueueShutdownReceiver(DestReceiver *self)
93 : : {
3631 94 : 1261 : TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self;
95 : :
2928 tgl@sss.pgh.pa.us 96 [ + - ]: 1261 : if (tqueue->queue != NULL)
97 : 1261 : shm_mq_detach(tqueue->queue);
98 : 1261 : tqueue->queue = NULL;
3641 rhaas@postgresql.org 99 : 1261 : }
100 : :
101 : : /*
102 : : * Destroy receiver when done with it
103 : : */
104 : : static void
105 : 1261 : tqueueDestroyReceiver(DestReceiver *self)
106 : : {
3592 107 : 1261 : TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self;
108 : :
109 : : /* We probably already detached from queue, but let's be sure */
2928 tgl@sss.pgh.pa.us 110 [ - + ]: 1261 : if (tqueue->queue != NULL)
2928 tgl@sss.pgh.pa.us 111 :UBC 0 : shm_mq_detach(tqueue->queue);
3641 rhaas@postgresql.org 112 :CBC 1261 : pfree(self);
113 : 1261 : }
114 : :
115 : : /*
116 : : * Create a DestReceiver that writes tuples to a tuple queue.
117 : : */
118 : : DestReceiver *
119 : 1267 : CreateTupleQueueDestReceiver(shm_mq_handle *handle)
120 : : {
121 : : TQueueDestReceiver *self;
122 : :
123 : 1267 : self = (TQueueDestReceiver *) palloc0(sizeof(TQueueDestReceiver));
124 : :
125 : 1267 : self->pub.receiveSlot = tqueueReceiveSlot;
126 : 1267 : self->pub.rStartup = tqueueStartupReceiver;
127 : 1267 : self->pub.rShutdown = tqueueShutdownReceiver;
128 : 1267 : self->pub.rDestroy = tqueueDestroyReceiver;
129 : 1267 : self->pub.mydest = DestTupleQueue;
3324 tgl@sss.pgh.pa.us 130 : 1267 : self->queue = handle;
131 : :
3641 rhaas@postgresql.org 132 : 1267 : return (DestReceiver *) self;
133 : : }
134 : :
135 : : /*
136 : : * Create a tuple queue reader.
137 : : */
138 : : TupleQueueReader *
2914 andres@anarazel.de 139 : 1267 : CreateTupleQueueReader(shm_mq_handle *handle)
140 : : {
3592 rhaas@postgresql.org 141 : 1267 : TupleQueueReader *reader = palloc0(sizeof(TupleQueueReader));
142 : :
143 : 1267 : reader->queue = handle;
144 : :
145 : 1267 : return reader;
146 : : }
147 : :
148 : : /*
149 : : * Destroy a tuple queue reader.
150 : : *
151 : : * Note: cleaning up the underlying shm_mq is the caller's responsibility.
152 : : * We won't access it here, as it may be detached already.
153 : : */
154 : : void
155 : 1261 : DestroyTupleQueueReader(TupleQueueReader *reader)
156 : : {
157 : 1261 : pfree(reader);
158 : 1261 : }
159 : :
160 : : /*
161 : : * Fetch a tuple from a tuple queue reader.
162 : : *
163 : : * The return value is NULL if there are no remaining tuples or if
164 : : * nowait = true and no tuple is ready to return. *done, if not NULL,
165 : : * is set to true when there are no remaining tuples and otherwise to false.
166 : : *
167 : : * The returned tuple, if any, is either in shared memory or a private buffer
168 : : * and should not be freed. The pointer is invalid after the next call to
169 : : * TupleQueueReaderNext().
170 : : *
171 : : * Even when shm_mq_receive() returns SHM_MQ_WOULD_BLOCK, this can still
172 : : * accumulate bytes from a partially-read message, so it's useful to call
173 : : * this with nowait = true even if nothing is returned.
174 : : */
175 : : MinimalTuple
176 : 2515942 : TupleQueueReaderNext(TupleQueueReader *reader, bool nowait, bool *done)
177 : : {
178 : : MinimalTuple tuple;
179 : : shm_mq_result result;
180 : : Size nbytes;
181 : : void *data;
182 : :
183 [ + - ]: 2515942 : if (done != NULL)
184 : 2515942 : *done = false;
185 : :
186 : : /* Attempt to read a message. */
2914 andres@anarazel.de 187 : 2515942 : result = shm_mq_receive(reader->queue, &nbytes, &data, nowait);
188 : :
189 : : /* If queue is detached, set *done and return NULL. */
190 [ + + ]: 2515942 : if (result == SHM_MQ_DETACHED)
191 : : {
192 [ + - ]: 1258 : if (done != NULL)
193 : 1258 : *done = true;
194 : 1258 : return NULL;
195 : : }
196 : :
197 : : /* In non-blocking mode, bail out if no message ready yet. */
198 [ + + ]: 2514684 : if (result == SHM_MQ_WOULD_BLOCK)
199 : 1449681 : return NULL;
200 [ - + ]: 1065003 : Assert(result == SHM_MQ_SUCCESS);
201 : :
202 : : /*
203 : : * Return a pointer to the queue memory directly (which had better be
204 : : * sufficiently aligned).
205 : : */
1877 tmunro@postgresql.or 206 : 1065003 : tuple = (MinimalTuple) data;
207 [ - + ]: 1065003 : Assert(tuple->t_len == nbytes);
208 : :
209 : 1065003 : return tuple;
210 : : }
|