Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * nodeAppend.c
4 : : * routines to handle append nodes.
5 : : *
6 : : * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
7 : : * Portions Copyright (c) 1994, Regents of the University of California
8 : : *
9 : : *
10 : : * IDENTIFICATION
11 : : * src/backend/executor/nodeAppend.c
12 : : *
13 : : *-------------------------------------------------------------------------
14 : : */
15 : : /* INTERFACE ROUTINES
16 : : * ExecInitAppend - initialize the append node
17 : : * ExecAppend - retrieve the next tuple from the node
18 : : * ExecEndAppend - shut down the append node
19 : : * ExecReScanAppend - rescan the append node
20 : : *
21 : : * NOTES
22 : : * Each append node contains a list of one or more subplans which
23 : : * must be iteratively processed (forwards or backwards).
24 : : * Tuples are retrieved by executing the 'whichplan'th subplan
25 : : * until the subplan stops returning tuples, at which point that
26 : : * plan is shut down and the next started up.
27 : : *
28 : : * Append nodes don't make use of their left and right
29 : : * subtrees, rather they maintain a list of subplans so
30 : : * a typical append node looks like this in the plan tree:
31 : : *
32 : : * ...
33 : : * /
34 : : * Append -------+------+------+--- nil
35 : : * / \ | | |
36 : : * nil nil ... ... ...
37 : : * subplans
38 : : *
39 : : * Append nodes are currently used for unions, and to support
40 : : * inheritance queries, where several relations need to be scanned.
41 : : * For example, in our standard person/student/employee/student-emp
42 : : * example, where student and employee inherit from person
43 : : * and student-emp inherits from student and employee, the
44 : : * query:
45 : : *
46 : : * select name from person
47 : : *
48 : : * generates the plan:
49 : : *
50 : : * |
51 : : * Append -------+-------+--------+--------+
52 : : * / \ | | | |
53 : : * nil nil Scan Scan Scan Scan
54 : : * | | | |
55 : : * person employee student student-emp
56 : : */
57 : :
58 : : #include "postgres.h"
59 : :
60 : : #include "executor/execAsync.h"
61 : : #include "executor/execPartition.h"
62 : : #include "executor/executor.h"
63 : : #include "executor/nodeAppend.h"
64 : : #include "miscadmin.h"
65 : : #include "pgstat.h"
66 : : #include "storage/latch.h"
67 : :
68 : : /* Shared state for parallel-aware Append. */
69 : : struct ParallelAppendState
70 : : {
71 : : LWLock pa_lock; /* mutual exclusion to choose next subplan */
72 : : int pa_next_plan; /* next plan to choose by any worker */
73 : :
74 : : /*
75 : : * pa_finished[i] should be true if no more workers should select subplan
76 : : * i. for a non-partial plan, this should be set to true as soon as a
77 : : * worker selects the plan; for a partial plan, it remains false until
78 : : * some worker executes the plan to completion.
79 : : */
80 : : bool pa_finished[FLEXIBLE_ARRAY_MEMBER];
81 : : };
82 : :
83 : : #define INVALID_SUBPLAN_INDEX -1
84 : : #define EVENT_BUFFER_SIZE 16
85 : :
86 : : static TupleTableSlot *ExecAppend(PlanState *pstate);
87 : : static bool choose_next_subplan_locally(AppendState *node);
88 : : static bool choose_next_subplan_for_leader(AppendState *node);
89 : : static bool choose_next_subplan_for_worker(AppendState *node);
90 : : static void mark_invalid_subplans_as_finished(AppendState *node);
91 : : static void ExecAppendAsyncBegin(AppendState *node);
92 : : static bool ExecAppendAsyncGetNext(AppendState *node, TupleTableSlot **result);
93 : : static bool ExecAppendAsyncRequest(AppendState *node, TupleTableSlot **result);
94 : : static void ExecAppendAsyncEventWait(AppendState *node);
95 : : static void classify_matching_subplans(AppendState *node);
96 : :
97 : : /* ----------------------------------------------------------------
98 : : * ExecInitAppend
99 : : *
100 : : * Begin all of the subscans of the append node.
101 : : *
102 : : * (This is potentially wasteful, since the entire result of the
103 : : * append node may not be scanned, but this way all of the
104 : : * structures get allocated in the executor's top level memory
105 : : * block instead of that of the call to ExecAppend.)
106 : : * ----------------------------------------------------------------
107 : : */
108 : : AppendState *
7130 tgl@sss.pgh.pa.us 109 :CBC 8664 : ExecInitAppend(Append *node, EState *estate, int eflags)
110 : : {
8311 111 : 8664 : AppendState *appendstate = makeNode(AppendState);
112 : : PlanState **appendplanstates;
113 : : const TupleTableSlotOps *appendops;
114 : : Bitmapset *validsubplans;
115 : : Bitmapset *asyncplans;
116 : : int nplans;
117 : : int nasyncplans;
118 : : int firstvalid;
119 : : int i,
120 : : j;
121 : :
122 : : /* check for unsupported flags */
7130 123 [ - + ]: 8664 : Assert(!(eflags & EXEC_FLAG_MARK));
124 : :
125 : : /*
126 : : * create new AppendState for our append node
127 : : */
8311 128 : 8664 : appendstate->ps.plan = (Plan *) node;
129 : 8664 : appendstate->ps.state = estate;
2973 andres@anarazel.de 130 : 8664 : appendstate->ps.ExecProcNode = ExecAppend;
131 : :
132 : : /* Let choose_next_subplan_* function handle setting the first subplan */
2709 alvherre@alvh.no-ip. 133 : 8664 : appendstate->as_whichplan = INVALID_SUBPLAN_INDEX;
1620 efujita@postgresql.o 134 : 8664 : appendstate->as_syncdone = false;
135 : 8664 : appendstate->as_begun = false;
136 : :
137 : : /* If run-time partition pruning is enabled, then set that up now */
219 amitlan@postgresql.o 138 [ + + ]: 8664 : if (node->part_prune_index >= 0)
139 : : {
140 : : PartitionPruneState *prunestate;
141 : :
142 : : /*
143 : : * Set up pruning data structure. This also initializes the set of
144 : : * subplans to initialize (validsubplans) by taking into account the
145 : : * result of performing initial pruning if any.
146 : : */
218 147 : 366 : prunestate = ExecInitPartitionExecPruning(&appendstate->ps,
148 : 366 : list_length(node->appendplans),
149 : : node->part_prune_index,
150 : : node->apprelids,
151 : : &validsubplans);
2642 tgl@sss.pgh.pa.us 152 : 366 : appendstate->as_prune_state = prunestate;
1250 alvherre@alvh.no-ip. 153 : 366 : nplans = bms_num_members(validsubplans);
154 : :
155 : : /*
156 : : * When no run-time pruning is required and there's at least one
157 : : * subplan, we can fill as_valid_subplans immediately, preventing
158 : : * later calls to ExecFindMatchingSubPlans.
159 : : */
2096 tgl@sss.pgh.pa.us 160 [ + + + + ]: 366 : if (!prunestate->do_exec_prune && nplans > 0)
161 : : {
2709 alvherre@alvh.no-ip. 162 : 133 : appendstate->as_valid_subplans = bms_add_range(NULL, 0, nplans - 1);
919 tgl@sss.pgh.pa.us 163 : 133 : appendstate->as_valid_subplans_identified = true;
164 : : }
165 : : }
166 : : else
167 : : {
2709 alvherre@alvh.no-ip. 168 : 8298 : nplans = list_length(node->appendplans);
169 : :
170 : : /*
171 : : * When run-time partition pruning is not enabled we can just mark all
172 : : * subplans as valid; they must also all be initialized.
173 : : */
2595 174 [ - + ]: 8298 : Assert(nplans > 0);
2709 175 : 8298 : appendstate->as_valid_subplans = validsubplans =
176 : 8298 : bms_add_range(NULL, 0, nplans - 1);
919 tgl@sss.pgh.pa.us 177 : 8298 : appendstate->as_valid_subplans_identified = true;
2709 alvherre@alvh.no-ip. 178 : 8298 : appendstate->as_prune_state = NULL;
179 : : }
180 : :
181 : 8664 : appendplanstates = (PlanState **) palloc(nplans *
182 : : sizeof(PlanState *));
183 : :
184 : : /*
185 : : * call ExecInitNode on each of the valid plans to be executed and save
186 : : * the results into the appendplanstates array.
187 : : *
188 : : * While at it, find out the first valid partial plan.
189 : : */
2238 drowley@postgresql.o 190 : 8664 : j = 0;
1620 efujita@postgresql.o 191 : 8664 : asyncplans = NULL;
192 : 8664 : nasyncplans = 0;
2699 alvherre@alvh.no-ip. 193 : 8664 : firstvalid = nplans;
2238 drowley@postgresql.o 194 : 8664 : i = -1;
195 [ + + ]: 32776 : while ((i = bms_next_member(validsubplans, i)) >= 0)
196 : : {
197 : 24112 : Plan *initNode = (Plan *) list_nth(node->appendplans, i);
198 : :
199 : : /*
200 : : * Record async subplans. When executing EvalPlanQual, we treat them
201 : : * as sync ones; don't do this when initializing an EvalPlanQual plan
202 : : * tree.
203 : : */
1620 efujita@postgresql.o 204 [ + + + - ]: 24112 : if (initNode->async_capable && estate->es_epq_active == NULL)
205 : : {
206 : 93 : asyncplans = bms_add_member(asyncplans, j);
207 : 93 : nasyncplans++;
208 : : }
209 : :
210 : : /*
211 : : * Record the lowest appendplans index which is a valid partial plan.
212 : : */
2238 drowley@postgresql.o 213 [ + + + + ]: 24112 : if (i >= node->first_partial_plan && j < firstvalid)
214 : 225 : firstvalid = j;
215 : :
216 : 24112 : appendplanstates[j++] = ExecInitNode(initNode, estate, eflags);
217 : : }
218 : :
2699 alvherre@alvh.no-ip. 219 : 8664 : appendstate->as_first_partial_plan = firstvalid;
2709 220 : 8664 : appendstate->appendplans = appendplanstates;
221 : 8664 : appendstate->as_nplans = nplans;
222 : :
223 : : /*
224 : : * Initialize Append's result tuple type and slot. If the child plans all
225 : : * produce the same fixed slot type, we can use that slot type; otherwise
226 : : * make a virtual slot. (Note that the result slot itself is used only to
227 : : * return a null tuple at end of execution; real tuples are returned to
228 : : * the caller in the children's own result slots. What we are doing here
229 : : * is allowing the parent plan node to optimize if the Append will return
230 : : * only one kind of slot.)
231 : : */
261 tgl@sss.pgh.pa.us 232 : 8664 : appendops = ExecGetCommonSlotOps(appendplanstates, j);
233 [ + + ]: 8664 : if (appendops != NULL)
234 : : {
235 : 8164 : ExecInitResultTupleSlotTL(&appendstate->ps, appendops);
236 : : }
237 : : else
238 : : {
239 : 500 : ExecInitResultTupleSlotTL(&appendstate->ps, &TTSOpsVirtual);
240 : : /* show that the output slot type is not fixed */
241 : 500 : appendstate->ps.resultopsset = true;
242 : 500 : appendstate->ps.resultopsfixed = false;
243 : : }
244 : :
245 : : /* Initialize async state */
1620 efujita@postgresql.o 246 : 8664 : appendstate->as_asyncplans = asyncplans;
247 : 8664 : appendstate->as_nasyncplans = nasyncplans;
248 : 8664 : appendstate->as_asyncrequests = NULL;
1552 249 : 8664 : appendstate->as_asyncresults = NULL;
250 : 8664 : appendstate->as_nasyncresults = 0;
251 : 8664 : appendstate->as_nasyncremain = 0;
1620 252 : 8664 : appendstate->as_needrequest = NULL;
253 : 8664 : appendstate->as_eventset = NULL;
1552 254 : 8664 : appendstate->as_valid_asyncplans = NULL;
255 : :
1620 256 [ + + ]: 8664 : if (nasyncplans > 0)
257 : : {
258 : 47 : appendstate->as_asyncrequests = (AsyncRequest **)
259 : 47 : palloc0(nplans * sizeof(AsyncRequest *));
260 : :
261 : 47 : i = -1;
262 [ + + ]: 140 : while ((i = bms_next_member(asyncplans, i)) >= 0)
263 : : {
264 : : AsyncRequest *areq;
265 : :
266 : 93 : areq = palloc(sizeof(AsyncRequest));
267 : 93 : areq->requestor = (PlanState *) appendstate;
268 : 93 : areq->requestee = appendplanstates[i];
269 : 93 : areq->request_index = i;
270 : 93 : areq->callback_pending = false;
271 : 93 : areq->request_complete = false;
272 : 93 : areq->result = NULL;
273 : :
274 : 93 : appendstate->as_asyncrequests[i] = areq;
275 : : }
276 : :
1552 277 : 47 : appendstate->as_asyncresults = (TupleTableSlot **)
278 : 47 : palloc0(nasyncplans * sizeof(TupleTableSlot *));
279 : :
919 tgl@sss.pgh.pa.us 280 [ + + ]: 47 : if (appendstate->as_valid_subplans_identified)
1552 efujita@postgresql.o 281 : 44 : classify_matching_subplans(appendstate);
282 : : }
283 : :
284 : : /*
285 : : * Miscellaneous initialization
286 : : */
287 : :
2709 alvherre@alvh.no-ip. 288 : 8664 : appendstate->ps.ps_ProjInfo = NULL;
289 : :
290 : : /* For parallel query, this will be overridden later. */
2832 rhaas@postgresql.org 291 : 8664 : appendstate->choose_next_subplan = choose_next_subplan_locally;
292 : :
8311 tgl@sss.pgh.pa.us 293 : 8664 : return appendstate;
294 : : }
295 : :
296 : : /* ----------------------------------------------------------------
297 : : * ExecAppend
298 : : *
299 : : * Handles iteration over multiple subplans.
300 : : * ----------------------------------------------------------------
301 : : */
302 : : static TupleTableSlot *
2973 andres@anarazel.de 303 : 1315261 : ExecAppend(PlanState *pstate)
304 : : {
305 : 1315261 : AppendState *node = castNode(AppendState, pstate);
306 : : TupleTableSlot *result;
307 : :
308 : : /*
309 : : * If this is the first call after Init or ReScan, we need to do the
310 : : * initialization work.
311 : : */
1620 efujita@postgresql.o 312 [ + + ]: 1315261 : if (!node->as_begun)
313 : : {
314 [ - + ]: 15578 : Assert(node->as_whichplan == INVALID_SUBPLAN_INDEX);
315 [ - + ]: 15578 : Assert(!node->as_syncdone);
316 : :
317 : : /* Nothing to do if there are no subplans */
2096 tgl@sss.pgh.pa.us 318 [ + + ]: 15578 : if (node->as_nplans == 0)
319 : 30 : return ExecClearTuple(node->ps.ps_ResultTupleSlot);
320 : :
321 : : /* If there are any async subplans, begin executing them. */
1620 efujita@postgresql.o 322 [ + + ]: 15548 : if (node->as_nasyncplans > 0)
323 : 37 : ExecAppendAsyncBegin(node);
324 : :
325 : : /*
326 : : * If no sync subplan has been chosen, we must choose one before
327 : : * proceeding.
328 : : */
329 [ + + + + ]: 15548 : if (!node->choose_next_subplan(node) && node->as_nasyncremain == 0)
2709 alvherre@alvh.no-ip. 330 : 1574 : return ExecClearTuple(node->ps.ps_ResultTupleSlot);
331 : :
1620 efujita@postgresql.o 332 [ + + + - : 13974 : Assert(node->as_syncdone ||
- + ]
333 : : (node->as_whichplan >= 0 &&
334 : : node->as_whichplan < node->as_nplans));
335 : :
336 : : /* And we're initialized. */
337 : 13974 : node->as_begun = true;
338 : : }
339 : :
340 : : for (;;)
10226 bruce@momjian.us 341 : 18501 : {
342 : : PlanState *subnode;
343 : :
2965 andres@anarazel.de 344 [ - + ]: 1332158 : CHECK_FOR_INTERRUPTS();
345 : :
346 : : /*
347 : : * try to get a tuple from an async subplan if any
348 : : */
1620 efujita@postgresql.o 349 [ + + - + ]: 1332158 : if (node->as_syncdone || !bms_is_empty(node->as_needrequest))
350 : : {
351 [ + - ]: 6137 : if (ExecAppendAsyncGetNext(node, &result))
352 : 6136 : return result;
1620 efujita@postgresql.o 353 [ # # ]:UBC 0 : Assert(!node->as_syncdone);
354 [ # # ]: 0 : Assert(bms_is_empty(node->as_needrequest));
355 : : }
356 : :
357 : : /*
358 : : * figure out which sync subplan we are currently processing
359 : : */
2832 rhaas@postgresql.org 360 [ + - - + ]:CBC 1326021 : Assert(node->as_whichplan >= 0 && node->as_whichplan < node->as_nplans);
7412 tgl@sss.pgh.pa.us 361 : 1326021 : subnode = node->appendplans[node->as_whichplan];
362 : :
363 : : /*
364 : : * get a tuple from the subplan
365 : : */
366 : 1326021 : result = ExecProcNode(subnode);
367 : :
368 [ + + + + ]: 1325995 : if (!TupIsNull(result))
369 : : {
370 : : /*
371 : : * If the subplan gave us something then return it as-is. We do
372 : : * NOT make use of the result slot that was set up in
373 : : * ExecInitAppend; there's no need for it.
374 : : */
375 : 1293847 : return result;
376 : : }
377 : :
378 : : /*
379 : : * wait or poll for async events if any. We do this before checking
380 : : * for the end of iteration, because it might drain the remaining
381 : : * async subplans.
382 : : */
1620 efujita@postgresql.o 383 [ + + ]: 32148 : if (node->as_nasyncremain > 0)
384 : 17 : ExecAppendAsyncEventWait(node);
385 : :
386 : : /* choose new sync subplan; if no sync/async subplans, we're done */
387 [ + + + + ]: 32148 : if (!node->choose_next_subplan(node) && node->as_nasyncremain == 0)
7412 tgl@sss.pgh.pa.us 388 : 13647 : return ExecClearTuple(node->ps.ps_ResultTupleSlot);
389 : : }
390 : : }
391 : :
392 : : /* ----------------------------------------------------------------
393 : : * ExecEndAppend
394 : : *
395 : : * Shuts down the subscans of the append node.
396 : : *
397 : : * Returns nothing of interest.
398 : : * ----------------------------------------------------------------
399 : : */
400 : : void
8311 401 : 8510 : ExecEndAppend(AppendState *node)
402 : : {
403 : : PlanState **appendplans;
404 : : int nplans;
405 : : int i;
406 : :
407 : : /*
408 : : * get information from the node
409 : : */
9915 bruce@momjian.us 410 : 8510 : appendplans = node->appendplans;
8311 tgl@sss.pgh.pa.us 411 : 8510 : nplans = node->as_nplans;
412 : :
413 : : /*
414 : : * shut down each of the subscans
415 : : */
10226 bruce@momjian.us 416 [ + + ]: 32254 : for (i = 0; i < nplans; i++)
5810 tgl@sss.pgh.pa.us 417 : 23744 : ExecEndNode(appendplans[i]);
10226 bruce@momjian.us 418 : 8510 : }
419 : :
420 : : void
5535 tgl@sss.pgh.pa.us 421 : 9780 : ExecReScanAppend(AppendState *node)
422 : : {
1620 efujita@postgresql.o 423 : 9780 : int nasyncplans = node->as_nasyncplans;
424 : : int i;
425 : :
426 : : /*
427 : : * If any PARAM_EXEC Params used in pruning expressions have changed, then
428 : : * we'd better unset the valid subplans so that they are reselected for
429 : : * the new parameter values.
430 : : */
2709 alvherre@alvh.no-ip. 431 [ + + + - ]: 11414 : if (node->as_prune_state &&
432 : 1634 : bms_overlap(node->ps.chgParam,
2645 tgl@sss.pgh.pa.us 433 : 1634 : node->as_prune_state->execparamids))
434 : : {
919 435 : 1634 : node->as_valid_subplans_identified = false;
2709 alvherre@alvh.no-ip. 436 : 1634 : bms_free(node->as_valid_subplans);
437 : 1634 : node->as_valid_subplans = NULL;
919 tgl@sss.pgh.pa.us 438 : 1634 : bms_free(node->as_valid_asyncplans);
439 : 1634 : node->as_valid_asyncplans = NULL;
440 : : }
441 : :
5810 442 [ + + ]: 40995 : for (i = 0; i < node->as_nplans; i++)
443 : : {
8069 bruce@momjian.us 444 : 31215 : PlanState *subnode = node->appendplans[i];
445 : :
446 : : /*
447 : : * ExecReScan doesn't know about my subplans, so I have to do
448 : : * changed-parameter signaling myself.
449 : : */
8245 tgl@sss.pgh.pa.us 450 [ + + ]: 31215 : if (node->ps.chgParam != NULL)
451 : 29056 : UpdateChangedParamSet(subnode, node->ps.chgParam);
452 : :
453 : : /*
454 : : * If chgParam of subnode is not null then plan will be re-scanned by
455 : : * first ExecProcNode or by first ExecAsyncRequest.
456 : : */
5535 457 [ + + ]: 31215 : if (subnode->chgParam == NULL)
458 : 8333 : ExecReScan(subnode);
459 : : }
460 : :
461 : : /* Reset async state */
1620 efujita@postgresql.o 462 [ + + ]: 9780 : if (nasyncplans > 0)
463 : : {
464 : 17 : i = -1;
465 [ + + ]: 51 : while ((i = bms_next_member(node->as_asyncplans, i)) >= 0)
466 : : {
467 : 34 : AsyncRequest *areq = node->as_asyncrequests[i];
468 : :
469 : 34 : areq->callback_pending = false;
470 : 34 : areq->request_complete = false;
471 : 34 : areq->result = NULL;
472 : : }
473 : :
1552 474 : 17 : node->as_nasyncresults = 0;
475 : 17 : node->as_nasyncremain = 0;
1620 476 : 17 : bms_free(node->as_needrequest);
477 : 17 : node->as_needrequest = NULL;
478 : : }
479 : :
480 : : /* Let choose_next_subplan_* function handle setting the first subplan */
2709 alvherre@alvh.no-ip. 481 : 9780 : node->as_whichplan = INVALID_SUBPLAN_INDEX;
1620 efujita@postgresql.o 482 : 9780 : node->as_syncdone = false;
483 : 9780 : node->as_begun = false;
2832 rhaas@postgresql.org 484 : 9780 : }
485 : :
486 : : /* ----------------------------------------------------------------
487 : : * Parallel Append Support
488 : : * ----------------------------------------------------------------
489 : : */
490 : :
491 : : /* ----------------------------------------------------------------
492 : : * ExecAppendEstimate
493 : : *
494 : : * Compute the amount of space we'll need in the parallel
495 : : * query DSM, and inform pcxt->estimator about our needs.
496 : : * ----------------------------------------------------------------
497 : : */
498 : : void
499 : 69 : ExecAppendEstimate(AppendState *node,
500 : : ParallelContext *pcxt)
501 : : {
502 : 69 : node->pstate_len =
503 : 69 : add_size(offsetof(ParallelAppendState, pa_finished),
504 : 69 : sizeof(bool) * node->as_nplans);
505 : :
506 : 69 : shm_toc_estimate_chunk(&pcxt->estimator, node->pstate_len);
507 : 69 : shm_toc_estimate_keys(&pcxt->estimator, 1);
508 : 69 : }
509 : :
510 : :
511 : : /* ----------------------------------------------------------------
512 : : * ExecAppendInitializeDSM
513 : : *
514 : : * Set up shared state for Parallel Append.
515 : : * ----------------------------------------------------------------
516 : : */
517 : : void
518 : 69 : ExecAppendInitializeDSM(AppendState *node,
519 : : ParallelContext *pcxt)
520 : : {
521 : : ParallelAppendState *pstate;
522 : :
523 : 69 : pstate = shm_toc_allocate(pcxt->toc, node->pstate_len);
524 : 69 : memset(pstate, 0, node->pstate_len);
525 : 69 : LWLockInitialize(&pstate->pa_lock, LWTRANCHE_PARALLEL_APPEND);
526 : 69 : shm_toc_insert(pcxt->toc, node->ps.plan->plan_node_id, pstate);
527 : :
528 : 69 : node->as_pstate = pstate;
529 : 69 : node->choose_next_subplan = choose_next_subplan_for_leader;
530 : 69 : }
531 : :
532 : : /* ----------------------------------------------------------------
533 : : * ExecAppendReInitializeDSM
534 : : *
535 : : * Reset shared state before beginning a fresh scan.
536 : : * ----------------------------------------------------------------
537 : : */
538 : : void
2832 rhaas@postgresql.org 539 :UBC 0 : ExecAppendReInitializeDSM(AppendState *node, ParallelContext *pcxt)
540 : : {
541 : 0 : ParallelAppendState *pstate = node->as_pstate;
542 : :
543 : 0 : pstate->pa_next_plan = 0;
544 : 0 : memset(pstate->pa_finished, 0, sizeof(bool) * node->as_nplans);
545 : 0 : }
546 : :
547 : : /* ----------------------------------------------------------------
548 : : * ExecAppendInitializeWorker
549 : : *
550 : : * Copy relevant information from TOC into planstate, and initialize
551 : : * whatever is required to choose and execute the optimal subplan.
552 : : * ----------------------------------------------------------------
553 : : */
554 : : void
2832 rhaas@postgresql.org 555 :CBC 159 : ExecAppendInitializeWorker(AppendState *node, ParallelWorkerContext *pwcxt)
556 : : {
557 : 159 : node->as_pstate = shm_toc_lookup(pwcxt->toc, node->ps.plan->plan_node_id, false);
558 : 159 : node->choose_next_subplan = choose_next_subplan_for_worker;
559 : 159 : }
560 : :
561 : : /* ----------------------------------------------------------------
562 : : * choose_next_subplan_locally
563 : : *
564 : : * Choose next sync subplan for a non-parallel-aware Append,
565 : : * returning false if there are no more.
566 : : * ----------------------------------------------------------------
567 : : */
568 : : static bool
569 : 47209 : choose_next_subplan_locally(AppendState *node)
570 : : {
571 : 47209 : int whichplan = node->as_whichplan;
572 : : int nextplan;
573 : :
574 : : /* We should never be called when there are no subplans */
2096 tgl@sss.pgh.pa.us 575 [ - + ]: 47209 : Assert(node->as_nplans > 0);
576 : :
577 : : /* Nothing to do if syncdone */
1620 efujita@postgresql.o 578 [ + + ]: 47209 : if (node->as_syncdone)
579 : 18 : return false;
580 : :
581 : : /*
582 : : * If first call then have the bms member function choose the first valid
583 : : * sync subplan by initializing whichplan to -1. If there happen to be no
584 : : * valid sync subplans then the bms member function will handle that by
585 : : * returning a negative number which will allow us to exit returning a
586 : : * false value.
587 : : */
2709 alvherre@alvh.no-ip. 588 [ + + ]: 47191 : if (whichplan == INVALID_SUBPLAN_INDEX)
589 : : {
1620 efujita@postgresql.o 590 [ + + ]: 15329 : if (node->as_nasyncplans > 0)
591 : : {
592 : : /* We'd have filled as_valid_subplans already */
919 tgl@sss.pgh.pa.us 593 [ - + ]: 19 : Assert(node->as_valid_subplans_identified);
594 : : }
595 [ + + ]: 15310 : else if (!node->as_valid_subplans_identified)
596 : : {
2709 alvherre@alvh.no-ip. 597 : 1691 : node->as_valid_subplans =
211 amitlan@postgresql.o 598 : 1691 : ExecFindMatchingSubPlans(node->as_prune_state, false, NULL);
919 tgl@sss.pgh.pa.us 599 : 1691 : node->as_valid_subplans_identified = true;
600 : : }
601 : :
2709 alvherre@alvh.no-ip. 602 : 15329 : whichplan = -1;
603 : : }
604 : :
605 : : /* Ensure whichplan is within the expected range */
606 [ + - - + ]: 47191 : Assert(whichplan >= -1 && whichplan <= node->as_nplans);
607 : :
608 [ + + ]: 47191 : if (ScanDirectionIsForward(node->ps.state->es_direction))
609 : 47182 : nextplan = bms_next_member(node->as_valid_subplans, whichplan);
610 : : else
611 : 9 : nextplan = bms_prev_member(node->as_valid_subplans, whichplan);
612 : :
613 [ + + ]: 47191 : if (nextplan < 0)
614 : : {
615 : : /* Set as_syncdone if in async mode */
1620 efujita@postgresql.o 616 [ + + ]: 15036 : if (node->as_nasyncplans > 0)
617 : 17 : node->as_syncdone = true;
2709 alvherre@alvh.no-ip. 618 : 15036 : return false;
619 : : }
620 : :
621 : 32155 : node->as_whichplan = nextplan;
622 : :
2832 rhaas@postgresql.org 623 : 32155 : return true;
624 : : }
625 : :
626 : : /* ----------------------------------------------------------------
627 : : * choose_next_subplan_for_leader
628 : : *
629 : : * Try to pick a plan which doesn't commit us to doing much
630 : : * work locally, so that as much work as possible is done in
631 : : * the workers. Cheapest subplans are at the end.
632 : : * ----------------------------------------------------------------
633 : : */
634 : : static bool
635 : 234 : choose_next_subplan_for_leader(AppendState *node)
636 : : {
637 : 234 : ParallelAppendState *pstate = node->as_pstate;
638 : :
639 : : /* Backward scan is not supported by parallel-aware plans */
640 [ - + ]: 234 : Assert(ScanDirectionIsForward(node->ps.state->es_direction));
641 : :
642 : : /* We should never be called when there are no subplans */
2096 tgl@sss.pgh.pa.us 643 [ - + ]: 234 : Assert(node->as_nplans > 0);
644 : :
2832 rhaas@postgresql.org 645 : 234 : LWLockAcquire(&pstate->pa_lock, LW_EXCLUSIVE);
646 : :
647 [ + + ]: 234 : if (node->as_whichplan != INVALID_SUBPLAN_INDEX)
648 : : {
649 : : /* Mark just-completed subplan as finished. */
650 : 174 : node->as_pstate->pa_finished[node->as_whichplan] = true;
651 : : }
652 : : else
653 : : {
654 : : /* Start with last subplan. */
655 : 60 : node->as_whichplan = node->as_nplans - 1;
656 : :
657 : : /*
658 : : * If we've yet to determine the valid subplans then do so now. If
659 : : * run-time pruning is disabled then the valid subplans will always be
660 : : * set to all subplans.
661 : : */
919 tgl@sss.pgh.pa.us 662 [ + + ]: 60 : if (!node->as_valid_subplans_identified)
663 : : {
2709 alvherre@alvh.no-ip. 664 : 12 : node->as_valid_subplans =
211 amitlan@postgresql.o 665 : 12 : ExecFindMatchingSubPlans(node->as_prune_state, false, NULL);
919 tgl@sss.pgh.pa.us 666 : 12 : node->as_valid_subplans_identified = true;
667 : :
668 : : /*
669 : : * Mark each invalid plan as finished to allow the loop below to
670 : : * select the first valid subplan.
671 : : */
2709 alvherre@alvh.no-ip. 672 : 12 : mark_invalid_subplans_as_finished(node);
673 : : }
674 : : }
675 : :
676 : : /* Loop until we find a subplan to execute. */
2832 rhaas@postgresql.org 677 [ + + ]: 384 : while (pstate->pa_finished[node->as_whichplan])
678 : : {
679 [ + + ]: 210 : if (node->as_whichplan == 0)
680 : : {
681 : 60 : pstate->pa_next_plan = INVALID_SUBPLAN_INDEX;
682 : 60 : node->as_whichplan = INVALID_SUBPLAN_INDEX;
683 : 60 : LWLockRelease(&pstate->pa_lock);
684 : 60 : return false;
685 : : }
686 : :
687 : : /*
688 : : * We needn't pay attention to as_valid_subplans here as all invalid
689 : : * plans have been marked as finished.
690 : : */
691 : 150 : node->as_whichplan--;
692 : : }
693 : :
694 : : /* If non-partial, immediately mark as finished. */
2699 alvherre@alvh.no-ip. 695 [ + + ]: 174 : if (node->as_whichplan < node->as_first_partial_plan)
2832 rhaas@postgresql.org 696 : 45 : node->as_pstate->pa_finished[node->as_whichplan] = true;
697 : :
698 : 174 : LWLockRelease(&pstate->pa_lock);
699 : :
700 : 174 : return true;
701 : : }
702 : :
703 : : /* ----------------------------------------------------------------
704 : : * choose_next_subplan_for_worker
705 : : *
706 : : * Choose next subplan for a parallel-aware Append, returning
707 : : * false if there are no more.
708 : : *
709 : : * We start from the first plan and advance through the list;
710 : : * when we get back to the end, we loop back to the first
711 : : * partial plan. This assigns the non-partial plans first in
712 : : * order of descending cost and then spreads out the workers
713 : : * as evenly as possible across the remaining partial plans.
714 : : * ----------------------------------------------------------------
715 : : */
716 : : static bool
717 : 253 : choose_next_subplan_for_worker(AppendState *node)
718 : : {
719 : 253 : ParallelAppendState *pstate = node->as_pstate;
720 : :
721 : : /* Backward scan is not supported by parallel-aware plans */
722 [ - + ]: 253 : Assert(ScanDirectionIsForward(node->ps.state->es_direction));
723 : :
724 : : /* We should never be called when there are no subplans */
2096 tgl@sss.pgh.pa.us 725 [ - + ]: 253 : Assert(node->as_nplans > 0);
726 : :
2832 rhaas@postgresql.org 727 : 253 : LWLockAcquire(&pstate->pa_lock, LW_EXCLUSIVE);
728 : :
729 : : /* Mark just-completed subplan as finished. */
730 [ + + ]: 253 : if (node->as_whichplan != INVALID_SUBPLAN_INDEX)
731 : 112 : node->as_pstate->pa_finished[node->as_whichplan] = true;
732 : :
733 : : /*
734 : : * If we've yet to determine the valid subplans then do so now. If
735 : : * run-time pruning is disabled then the valid subplans will always be set
736 : : * to all subplans.
737 : : */
919 tgl@sss.pgh.pa.us 738 [ + + ]: 141 : else if (!node->as_valid_subplans_identified)
739 : : {
2709 alvherre@alvh.no-ip. 740 : 12 : node->as_valid_subplans =
211 amitlan@postgresql.o 741 : 12 : ExecFindMatchingSubPlans(node->as_prune_state, false, NULL);
919 tgl@sss.pgh.pa.us 742 : 12 : node->as_valid_subplans_identified = true;
743 : :
2709 alvherre@alvh.no-ip. 744 : 12 : mark_invalid_subplans_as_finished(node);
745 : : }
746 : :
747 : : /* If all the plans are already done, we have nothing to do */
2832 rhaas@postgresql.org 748 [ + + ]: 253 : if (pstate->pa_next_plan == INVALID_SUBPLAN_INDEX)
749 : : {
750 : 123 : LWLockRelease(&pstate->pa_lock);
751 : 123 : return false;
752 : : }
753 : :
754 : : /* Save the plan from which we are starting the search. */
2767 755 : 130 : node->as_whichplan = pstate->pa_next_plan;
756 : :
757 : : /* Loop until we find a valid subplan to execute. */
2832 758 [ + + ]: 221 : while (pstate->pa_finished[pstate->pa_next_plan])
759 : : {
760 : : int nextplan;
761 : :
2707 alvherre@alvh.no-ip. 762 : 109 : nextplan = bms_next_member(node->as_valid_subplans,
763 : : pstate->pa_next_plan);
764 [ + + ]: 109 : if (nextplan >= 0)
765 : : {
766 : : /* Advance to the next valid plan. */
767 : 76 : pstate->pa_next_plan = nextplan;
768 : : }
2699 769 [ + + ]: 33 : else if (node->as_whichplan > node->as_first_partial_plan)
770 : : {
771 : : /*
772 : : * Try looping back to the first valid partial plan, if there is
773 : : * one. If there isn't, arrange to bail out below.
774 : : */
2707 775 : 29 : nextplan = bms_next_member(node->as_valid_subplans,
2699 776 : 29 : node->as_first_partial_plan - 1);
2707 777 : 29 : pstate->pa_next_plan =
778 [ - + ]: 29 : nextplan < 0 ? node->as_whichplan : nextplan;
779 : : }
780 : : else
781 : : {
782 : : /*
783 : : * At last plan, and either there are no partial plans or we've
784 : : * tried them all. Arrange to bail out.
785 : : */
2832 rhaas@postgresql.org 786 : 4 : pstate->pa_next_plan = node->as_whichplan;
787 : : }
788 : :
789 [ + + ]: 109 : if (pstate->pa_next_plan == node->as_whichplan)
790 : : {
791 : : /* We've tried everything! */
792 : 18 : pstate->pa_next_plan = INVALID_SUBPLAN_INDEX;
793 : 18 : LWLockRelease(&pstate->pa_lock);
794 : 18 : return false;
795 : : }
796 : : }
797 : :
798 : : /* Pick the plan we found, and advance pa_next_plan one more time. */
2707 alvherre@alvh.no-ip. 799 : 112 : node->as_whichplan = pstate->pa_next_plan;
800 : 112 : pstate->pa_next_plan = bms_next_member(node->as_valid_subplans,
801 : : pstate->pa_next_plan);
802 : :
803 : : /*
804 : : * If there are no more valid plans then try setting the next plan to the
805 : : * first valid partial plan.
806 : : */
807 [ + + ]: 112 : if (pstate->pa_next_plan < 0)
808 : : {
809 : 16 : int nextplan = bms_next_member(node->as_valid_subplans,
2699 810 : 16 : node->as_first_partial_plan - 1);
811 : :
2707 812 [ + - ]: 16 : if (nextplan >= 0)
813 : 16 : pstate->pa_next_plan = nextplan;
814 : : else
815 : : {
816 : : /*
817 : : * There are no valid partial plans, and we already chose the last
818 : : * non-partial plan; so flag that there's nothing more for our
819 : : * fellow workers to do.
820 : : */
2831 rhaas@postgresql.org 821 :UBC 0 : pstate->pa_next_plan = INVALID_SUBPLAN_INDEX;
822 : : }
823 : : }
824 : :
825 : : /* If non-partial, immediately mark as finished. */
2699 alvherre@alvh.no-ip. 826 [ + + ]:CBC 112 : if (node->as_whichplan < node->as_first_partial_plan)
2832 rhaas@postgresql.org 827 : 24 : node->as_pstate->pa_finished[node->as_whichplan] = true;
828 : :
829 : 112 : LWLockRelease(&pstate->pa_lock);
830 : :
831 : 112 : return true;
832 : : }
833 : :
834 : : /*
835 : : * mark_invalid_subplans_as_finished
836 : : * Marks the ParallelAppendState's pa_finished as true for each invalid
837 : : * subplan.
838 : : *
839 : : * This function should only be called for parallel Append with run-time
840 : : * pruning enabled.
841 : : */
842 : : static void
2709 alvherre@alvh.no-ip. 843 : 24 : mark_invalid_subplans_as_finished(AppendState *node)
844 : : {
845 : : int i;
846 : :
847 : : /* Only valid to call this while in parallel Append mode */
848 [ - + ]: 24 : Assert(node->as_pstate);
849 : :
850 : : /* Shouldn't have been called when run-time pruning is not enabled */
851 [ - + ]: 24 : Assert(node->as_prune_state);
852 : :
853 : : /* Nothing to do if all plans are valid */
854 [ - + ]: 24 : if (bms_num_members(node->as_valid_subplans) == node->as_nplans)
2709 alvherre@alvh.no-ip. 855 :UBC 0 : return;
856 : :
857 : : /* Mark all non-valid plans as finished */
2709 alvherre@alvh.no-ip. 858 [ + + ]:CBC 81 : for (i = 0; i < node->as_nplans; i++)
859 : : {
860 [ + + ]: 57 : if (!bms_is_member(i, node->as_valid_subplans))
861 : 24 : node->as_pstate->pa_finished[i] = true;
862 : : }
863 : : }
864 : :
865 : : /* ----------------------------------------------------------------
866 : : * Asynchronous Append Support
867 : : * ----------------------------------------------------------------
868 : : */
869 : :
870 : : /* ----------------------------------------------------------------
871 : : * ExecAppendAsyncBegin
872 : : *
873 : : * Begin executing designed async-capable subplans.
874 : : * ----------------------------------------------------------------
875 : : */
876 : : static void
1620 efujita@postgresql.o 877 : 37 : ExecAppendAsyncBegin(AppendState *node)
878 : : {
879 : : int i;
880 : :
881 : : /* Backward scan is not supported by async-aware Appends. */
882 [ - + ]: 37 : Assert(ScanDirectionIsForward(node->ps.state->es_direction));
883 : :
884 : : /* We should never be called when there are no subplans */
1552 885 [ - + ]: 37 : Assert(node->as_nplans > 0);
886 : :
887 : : /* We should never be called when there are no async subplans. */
1620 888 [ - + ]: 37 : Assert(node->as_nasyncplans > 0);
889 : :
890 : : /* If we've yet to determine the valid subplans then do so now. */
919 tgl@sss.pgh.pa.us 891 [ + + ]: 37 : if (!node->as_valid_subplans_identified)
892 : : {
1620 efujita@postgresql.o 893 : 2 : node->as_valid_subplans =
211 amitlan@postgresql.o 894 : 2 : ExecFindMatchingSubPlans(node->as_prune_state, false, NULL);
919 tgl@sss.pgh.pa.us 895 : 2 : node->as_valid_subplans_identified = true;
896 : :
1552 efujita@postgresql.o 897 : 2 : classify_matching_subplans(node);
898 : : }
899 : :
900 : : /* Initialize state variables. */
901 : 37 : node->as_syncdone = bms_is_empty(node->as_valid_subplans);
902 : 37 : node->as_nasyncremain = bms_num_members(node->as_valid_asyncplans);
903 : :
904 : : /* Nothing to do if there are no valid async subplans. */
1620 905 [ - + ]: 37 : if (node->as_nasyncremain == 0)
1620 efujita@postgresql.o 906 :UBC 0 : return;
907 : :
908 : : /* Make a request for each of the valid async subplans. */
1620 efujita@postgresql.o 909 :CBC 37 : i = -1;
910 [ + + ]: 109 : while ((i = bms_next_member(node->as_valid_asyncplans, i)) >= 0)
911 : : {
912 : 72 : AsyncRequest *areq = node->as_asyncrequests[i];
913 : :
914 [ - + ]: 72 : Assert(areq->request_index == i);
915 [ - + ]: 72 : Assert(!areq->callback_pending);
916 : :
917 : : /* Do the actual work. */
918 : 72 : ExecAsyncRequest(areq);
919 : : }
920 : : }
921 : :
922 : : /* ----------------------------------------------------------------
923 : : * ExecAppendAsyncGetNext
924 : : *
925 : : * Get the next tuple from any of the asynchronous subplans.
926 : : * ----------------------------------------------------------------
927 : : */
928 : : static bool
929 : 6137 : ExecAppendAsyncGetNext(AppendState *node, TupleTableSlot **result)
930 : : {
931 : 6137 : *result = NULL;
932 : :
933 : : /* We should never be called when there are no valid async subplans. */
934 [ - + ]: 6137 : Assert(node->as_nasyncremain > 0);
935 : :
936 : : /* Request a tuple asynchronously. */
937 [ + + ]: 6137 : if (ExecAppendAsyncRequest(node, result))
938 : 6041 : return true;
939 : :
940 [ + + ]: 135 : while (node->as_nasyncremain > 0)
941 : : {
942 [ - + ]: 105 : CHECK_FOR_INTERRUPTS();
943 : :
944 : : /* Wait or poll for async events. */
945 : 105 : ExecAppendAsyncEventWait(node);
946 : :
947 : : /* Request a tuple asynchronously. */
948 [ + + ]: 104 : if (ExecAppendAsyncRequest(node, result))
949 : 65 : return true;
950 : :
951 : : /* Break from loop if there's any sync subplan that isn't complete. */
952 [ - + ]: 39 : if (!node->as_syncdone)
1620 efujita@postgresql.o 953 :UBC 0 : break;
954 : : }
955 : :
956 : : /*
957 : : * If all sync subplans are complete, we're totally done scanning the
958 : : * given node. Otherwise, we're done with the asynchronous stuff but must
959 : : * continue scanning the sync subplans.
960 : : */
1620 efujita@postgresql.o 961 [ + - ]:CBC 30 : if (node->as_syncdone)
962 : : {
963 [ - + ]: 30 : Assert(node->as_nasyncremain == 0);
964 : 30 : *result = ExecClearTuple(node->ps.ps_ResultTupleSlot);
965 : 30 : return true;
966 : : }
967 : :
1620 efujita@postgresql.o 968 :UBC 0 : return false;
969 : : }
970 : :
971 : : /* ----------------------------------------------------------------
972 : : * ExecAppendAsyncRequest
973 : : *
974 : : * Request a tuple asynchronously.
975 : : * ----------------------------------------------------------------
976 : : */
977 : : static bool
1620 efujita@postgresql.o 978 :CBC 6241 : ExecAppendAsyncRequest(AppendState *node, TupleTableSlot **result)
979 : : {
980 : : Bitmapset *needrequest;
981 : : int i;
982 : :
983 : : /* Nothing to do if there are no async subplans needing a new request. */
984 [ + + ]: 6241 : if (bms_is_empty(node->as_needrequest))
985 : : {
1597 986 [ - + ]: 62 : Assert(node->as_nasyncresults == 0);
1620 987 : 62 : return false;
988 : : }
989 : :
990 : : /*
991 : : * If there are any asynchronously-generated results that have not yet
992 : : * been returned, we have nothing to do; just return one of them.
993 : : */
994 [ + + ]: 6179 : if (node->as_nasyncresults > 0)
995 : : {
996 : 1289 : --node->as_nasyncresults;
997 : 1289 : *result = node->as_asyncresults[node->as_nasyncresults];
998 : 1289 : return true;
999 : : }
1000 : :
1001 : : /* Make a new request for each of the async subplans that need it. */
1002 : 4890 : needrequest = node->as_needrequest;
1003 : 4890 : node->as_needrequest = NULL;
1004 : 4890 : i = -1;
1005 [ + + ]: 10993 : while ((i = bms_next_member(needrequest, i)) >= 0)
1006 : : {
1007 : 6103 : AsyncRequest *areq = node->as_asyncrequests[i];
1008 : :
1009 : : /* Do the actual work. */
1010 : 6103 : ExecAsyncRequest(areq);
1011 : : }
1012 : 4890 : bms_free(needrequest);
1013 : :
1014 : : /* Return one of the asynchronously-generated results if any. */
1015 [ + + ]: 4890 : if (node->as_nasyncresults > 0)
1016 : : {
1017 : 4817 : --node->as_nasyncresults;
1018 : 4817 : *result = node->as_asyncresults[node->as_nasyncresults];
1019 : 4817 : return true;
1020 : : }
1021 : :
1022 : 73 : return false;
1023 : : }
1024 : :
1025 : : /* ----------------------------------------------------------------
1026 : : * ExecAppendAsyncEventWait
1027 : : *
1028 : : * Wait or poll for file descriptor events and fire callbacks.
1029 : : * ----------------------------------------------------------------
1030 : : */
1031 : : static void
1032 : 122 : ExecAppendAsyncEventWait(AppendState *node)
1033 : : {
178 heikki.linnakangas@i 1034 : 122 : int nevents = node->as_nasyncplans + 2;
1620 efujita@postgresql.o 1035 [ + + ]: 122 : long timeout = node->as_syncdone ? -1 : 0;
1036 : : WaitEvent occurred_event[EVENT_BUFFER_SIZE];
1037 : : int noccurred;
1038 : : int i;
1039 : :
1040 : : /* We should never be called when there are no valid async subplans. */
1041 [ - + ]: 122 : Assert(node->as_nasyncremain > 0);
1042 : :
653 heikki.linnakangas@i 1043 [ - + ]: 122 : Assert(node->as_eventset == NULL);
1044 : 122 : node->as_eventset = CreateWaitEventSet(CurrentResourceOwner, nevents);
1620 efujita@postgresql.o 1045 : 122 : AddWaitEventToSet(node->as_eventset, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET,
1046 : : NULL, NULL);
1047 : :
1048 : : /* Give each waiting subplan a chance to add an event. */
1049 : 122 : i = -1;
1050 [ + + ]: 374 : while ((i = bms_next_member(node->as_asyncplans, i)) >= 0)
1051 : : {
1052 : 253 : AsyncRequest *areq = node->as_asyncrequests[i];
1053 : :
1054 [ + + ]: 253 : if (areq->callback_pending)
1055 : 221 : ExecAsyncConfigureWait(areq);
1056 : : }
1057 : :
1058 : : /*
1059 : : * No need for further processing if none of the subplans configured any
1060 : : * events.
1061 : : */
1499 1062 [ + + ]: 121 : if (GetNumRegisteredWaitEvents(node->as_eventset) == 1)
1063 : : {
1064 : 1 : FreeWaitEventSet(node->as_eventset);
1065 : 1 : node->as_eventset = NULL;
1066 : 6 : return;
1067 : : }
1068 : :
1069 : : /*
1070 : : * Add the process latch to the set, so that we wake up to process the
1071 : : * standard interrupts with CHECK_FOR_INTERRUPTS().
1072 : : *
1073 : : * NOTE: For historical reasons, it's important that this is added to the
1074 : : * WaitEventSet after the ExecAsyncConfigureWait() calls. Namely,
1075 : : * postgres_fdw calls "GetNumRegisteredWaitEvents(set) == 1" to check if
1076 : : * any other events are in the set. That's a poor design, it's
1077 : : * questionable for postgres_fdw to be doing that in the first place, but
1078 : : * we cannot change it now. The pattern has possibly been copied to other
1079 : : * extensions too.
1080 : : */
178 heikki.linnakangas@i 1081 : 120 : AddWaitEventToSet(node->as_eventset, WL_LATCH_SET, PGINVALID_SOCKET,
1082 : : MyLatch, NULL);
1083 : :
1084 : : /* Return at most EVENT_BUFFER_SIZE events in one call. */
1597 efujita@postgresql.o 1085 [ - + ]: 120 : if (nevents > EVENT_BUFFER_SIZE)
1597 efujita@postgresql.o 1086 :UBC 0 : nevents = EVENT_BUFFER_SIZE;
1087 : :
1088 : : /*
1089 : : * If the timeout is -1, wait until at least one event occurs. If the
1090 : : * timeout is 0, poll for events, but do not wait at all.
1091 : : */
1620 efujita@postgresql.o 1092 :CBC 120 : noccurred = WaitEventSetWait(node->as_eventset, timeout, occurred_event,
1093 : : nevents, WAIT_EVENT_APPEND_READY);
1094 : 120 : FreeWaitEventSet(node->as_eventset);
1095 : 120 : node->as_eventset = NULL;
1096 [ + + ]: 120 : if (noccurred == 0)
1097 : 5 : return;
1098 : :
1099 : : /* Deliver notifications. */
1100 [ + + ]: 264 : for (i = 0; i < noccurred; i++)
1101 : : {
1102 : 149 : WaitEvent *w = &occurred_event[i];
1103 : :
1104 : : /*
1105 : : * Each waiting subplan should have registered its wait event with
1106 : : * user_data pointing back to its AsyncRequest.
1107 : : */
1108 [ + - ]: 149 : if ((w->events & WL_SOCKET_READABLE) != 0)
1109 : : {
1110 : 149 : AsyncRequest *areq = (AsyncRequest *) w->user_data;
1111 : :
1496 1112 [ + - ]: 149 : if (areq->callback_pending)
1113 : : {
1114 : : /*
1115 : : * Mark it as no longer needing a callback. We must do this
1116 : : * before dispatching the callback in case the callback resets
1117 : : * the flag.
1118 : : */
1119 : 149 : areq->callback_pending = false;
1120 : :
1121 : : /* Do the actual work. */
1122 : 149 : ExecAsyncNotify(areq);
1123 : : }
1124 : : }
1125 : :
1126 : : /* Handle standard interrupts */
178 heikki.linnakangas@i 1127 [ - + ]: 149 : if ((w->events & WL_LATCH_SET) != 0)
1128 : : {
178 heikki.linnakangas@i 1129 :UBC 0 : ResetLatch(MyLatch);
1130 [ # # ]: 0 : CHECK_FOR_INTERRUPTS();
1131 : : }
1132 : : }
1133 : : }
1134 : :
1135 : : /* ----------------------------------------------------------------
1136 : : * ExecAsyncAppendResponse
1137 : : *
1138 : : * Receive a response from an asynchronous request we made.
1139 : : * ----------------------------------------------------------------
1140 : : */
1141 : : void
1620 efujita@postgresql.o 1142 :CBC 6328 : ExecAsyncAppendResponse(AsyncRequest *areq)
1143 : : {
1144 : 6328 : AppendState *node = (AppendState *) areq->requestor;
1145 : 6328 : TupleTableSlot *slot = areq->result;
1146 : :
1147 : : /* The result should be a TupleTableSlot or NULL. */
1148 [ + + - + ]: 6328 : Assert(slot == NULL || IsA(slot, TupleTableSlot));
1149 : :
1150 : : /* Nothing to do if the request is pending. */
1151 [ + + ]: 6328 : if (!areq->request_complete)
1152 : : {
1153 : : /* The request would have been pending for a callback. */
1154 [ - + ]: 161 : Assert(areq->callback_pending);
1155 : 161 : return;
1156 : : }
1157 : :
1158 : : /* If the result is NULL or an empty slot, there's nothing more to do. */
1159 [ + + - + ]: 6167 : if (TupIsNull(slot))
1160 : : {
1161 : : /* The ending subplan wouldn't have been pending for a callback. */
1162 [ - + ]: 60 : Assert(!areq->callback_pending);
1163 : 60 : --node->as_nasyncremain;
1164 : 60 : return;
1165 : : }
1166 : :
1167 : : /* Save result so we can return it. */
1168 [ - + ]: 6107 : Assert(node->as_nasyncresults < node->as_nasyncplans);
1169 : 6107 : node->as_asyncresults[node->as_nasyncresults++] = slot;
1170 : :
1171 : : /*
1172 : : * Mark the subplan that returned a result as ready for a new request. We
1173 : : * don't launch another one here immediately because it might complete.
1174 : : */
1175 : 6107 : node->as_needrequest = bms_add_member(node->as_needrequest,
1176 : : areq->request_index);
1177 : : }
1178 : :
1179 : : /* ----------------------------------------------------------------
1180 : : * classify_matching_subplans
1181 : : *
1182 : : * Classify the node's as_valid_subplans into sync ones and
1183 : : * async ones, adjust it to contain sync ones only, and save
1184 : : * async ones in the node's as_valid_asyncplans.
1185 : : * ----------------------------------------------------------------
1186 : : */
1187 : : static void
1188 : 46 : classify_matching_subplans(AppendState *node)
1189 : : {
1190 : : Bitmapset *valid_asyncplans;
1191 : :
919 tgl@sss.pgh.pa.us 1192 [ - + ]: 46 : Assert(node->as_valid_subplans_identified);
1620 efujita@postgresql.o 1193 [ - + ]: 46 : Assert(node->as_valid_asyncplans == NULL);
1194 : :
1195 : : /* Nothing to do if there are no valid subplans. */
1196 [ - + ]: 46 : if (bms_is_empty(node->as_valid_subplans))
1197 : : {
1620 efujita@postgresql.o 1198 :UBC 0 : node->as_syncdone = true;
1199 : 0 : node->as_nasyncremain = 0;
1200 : 0 : return;
1201 : : }
1202 : :
1203 : : /* Nothing to do if there are no valid async subplans. */
1620 efujita@postgresql.o 1204 [ - + ]:CBC 46 : if (!bms_overlap(node->as_valid_subplans, node->as_asyncplans))
1205 : : {
1620 efujita@postgresql.o 1206 :UBC 0 : node->as_nasyncremain = 0;
1207 : 0 : return;
1208 : : }
1209 : :
1210 : : /* Get valid async subplans. */
919 tgl@sss.pgh.pa.us 1211 :CBC 46 : valid_asyncplans = bms_intersect(node->as_asyncplans,
1212 : 46 : node->as_valid_subplans);
1213 : :
1214 : : /* Adjust the valid subplans to contain sync subplans only. */
1620 efujita@postgresql.o 1215 : 46 : node->as_valid_subplans = bms_del_members(node->as_valid_subplans,
1216 : : valid_asyncplans);
1217 : :
1218 : : /* Save valid async subplans. */
1219 : 46 : node->as_valid_asyncplans = valid_asyncplans;
1220 : : }
|