Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * nodeAppend.c
4 : : * routines to handle append nodes.
5 : : *
6 : : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
7 : : * Portions Copyright (c) 1994, Regents of the University of California
8 : : *
9 : : *
10 : : * IDENTIFICATION
11 : : * src/backend/executor/nodeAppend.c
12 : : *
13 : : *-------------------------------------------------------------------------
14 : : */
15 : : /*
16 : : * INTERFACE ROUTINES
17 : : * ExecInitAppend - initialize the append node
18 : : * ExecAppend - retrieve the next tuple from the node
19 : : * ExecEndAppend - shut down the append node
20 : : * ExecReScanAppend - rescan the append node
21 : : *
22 : : * NOTES
23 : : * Each append node contains a list of one or more subplans which
24 : : * must be iteratively processed (forwards or backwards).
25 : : * Tuples are retrieved by executing the 'whichplan'th subplan
26 : : * until the subplan stops returning tuples, at which point that
27 : : * plan is shut down and the next started up.
28 : : *
29 : : * Append nodes don't make use of their left and right
30 : : * subtrees, rather they maintain a list of subplans so
31 : : * a typical append node looks like this in the plan tree:
32 : : *
33 : : * ...
34 : : * /
35 : : * Append -------+------+------+--- nil
36 : : * / \ | | |
37 : : * nil nil ... ... ...
38 : : * subplans
39 : : *
40 : : * Append nodes are currently used for unions, and to support
41 : : * inheritance queries, where several relations need to be scanned.
42 : : * For example, in our standard person/student/employee/student-emp
43 : : * example, where student and employee inherit from person
44 : : * and student-emp inherits from student and employee, the
45 : : * query:
46 : : *
47 : : * select name from person
48 : : *
49 : : * generates the plan:
50 : : *
51 : : * |
52 : : * Append -------+-------+--------+--------+
53 : : * / \ | | | |
54 : : * nil nil Scan Scan Scan Scan
55 : : * | | | |
56 : : * person employee student student-emp
57 : : */
58 : :
59 : : #include "postgres.h"
60 : :
61 : : #include "executor/execAsync.h"
62 : : #include "executor/execPartition.h"
63 : : #include "executor/executor.h"
64 : : #include "executor/nodeAppend.h"
65 : : #include "miscadmin.h"
66 : : #include "pgstat.h"
67 : : #include "storage/latch.h"
68 : : #include "storage/lwlock.h"
69 : : #include "utils/wait_event.h"
70 : :
71 : : /* Shared state for parallel-aware Append. */
72 : : struct ParallelAppendState
73 : : {
74 : : LWLock pa_lock; /* mutual exclusion to choose next subplan */
75 : : int pa_next_plan; /* next plan to choose by any worker */
76 : :
77 : : /*
78 : : * pa_finished[i] should be true if no more workers should select subplan
79 : : * i. for a non-partial plan, this should be set to true as soon as a
80 : : * worker selects the plan; for a partial plan, it remains false until
81 : : * some worker executes the plan to completion.
82 : : */
83 : : bool pa_finished[FLEXIBLE_ARRAY_MEMBER];
84 : : };
85 : :
86 : : #define INVALID_SUBPLAN_INDEX -1
87 : : #define EVENT_BUFFER_SIZE 16
88 : :
89 : : static TupleTableSlot *ExecAppend(PlanState *pstate);
90 : : static bool choose_next_subplan_locally(AppendState *node);
91 : : static bool choose_next_subplan_for_leader(AppendState *node);
92 : : static bool choose_next_subplan_for_worker(AppendState *node);
93 : : static void mark_invalid_subplans_as_finished(AppendState *node);
94 : : static void ExecAppendAsyncBegin(AppendState *node);
95 : : static bool ExecAppendAsyncGetNext(AppendState *node, TupleTableSlot **result);
96 : : static bool ExecAppendAsyncRequest(AppendState *node, TupleTableSlot **result);
97 : : static void ExecAppendAsyncEventWait(AppendState *node);
98 : : static void classify_matching_subplans(AppendState *node);
99 : :
100 : : /* ----------------------------------------------------------------
101 : : * ExecInitAppend
102 : : *
103 : : * Begin all of the subscans of the append node.
104 : : *
105 : : * (This is potentially wasteful, since the entire result of the
106 : : * append node may not be scanned, but this way all of the
107 : : * structures get allocated in the executor's top level memory
108 : : * block instead of that of the call to ExecAppend.)
109 : : * ----------------------------------------------------------------
110 : : */
111 : : AppendState *
7396 tgl@sss.pgh.pa.us 112 :CBC 12584 : ExecInitAppend(Append *node, EState *estate, int eflags)
113 : : {
8577 114 : 12584 : AppendState *appendstate = makeNode(AppendState);
115 : : PlanState **appendplanstates;
116 : : const TupleTableSlotOps *appendops;
117 : : Bitmapset *validsubplans;
118 : : Bitmapset *asyncplans;
119 : : int nplans;
120 : : int nasyncplans;
121 : : int firstvalid;
122 : : int i,
123 : : j;
124 : :
125 : : /* check for unsupported flags */
7396 126 [ - + ]: 12584 : Assert(!(eflags & EXEC_FLAG_MARK));
127 : :
128 : : /*
129 : : * create new AppendState for our append node
130 : : */
8577 131 : 12584 : appendstate->ps.plan = (Plan *) node;
132 : 12584 : appendstate->ps.state = estate;
3239 andres@anarazel.de 133 : 12584 : appendstate->ps.ExecProcNode = ExecAppend;
134 : :
135 : : /* Let choose_next_subplan_* function handle setting the first subplan */
2975 alvherre@alvh.no-ip. 136 : 12584 : appendstate->as_whichplan = INVALID_SUBPLAN_INDEX;
1886 efujita@postgresql.o 137 : 12584 : appendstate->as_syncdone = false;
138 : 12584 : appendstate->as_begun = false;
139 : :
140 : : /* If run-time partition pruning is enabled, then set that up now */
485 amitlan@postgresql.o 141 [ + + ]: 12584 : if (node->part_prune_index >= 0)
142 : : {
143 : : PartitionPruneState *prunestate;
144 : :
145 : : /*
146 : : * Set up pruning data structure. This also initializes the set of
147 : : * subplans to initialize (validsubplans) by taking into account the
148 : : * result of performing initial pruning if any.
149 : : */
484 150 : 488 : prunestate = ExecInitPartitionExecPruning(&appendstate->ps,
151 : 488 : list_length(node->appendplans),
152 : : node->part_prune_index,
153 : : node->apprelids,
154 : : &validsubplans);
2908 tgl@sss.pgh.pa.us 155 : 488 : appendstate->as_prune_state = prunestate;
1516 alvherre@alvh.no-ip. 156 : 488 : nplans = bms_num_members(validsubplans);
157 : :
158 : : /*
159 : : * When no run-time pruning is required and there's at least one
160 : : * subplan, we can fill as_valid_subplans immediately, preventing
161 : : * later calls to ExecFindMatchingSubPlans.
162 : : */
2362 tgl@sss.pgh.pa.us 163 [ + + + + ]: 488 : if (!prunestate->do_exec_prune && nplans > 0)
164 : : {
2975 alvherre@alvh.no-ip. 165 : 178 : appendstate->as_valid_subplans = bms_add_range(NULL, 0, nplans - 1);
1185 tgl@sss.pgh.pa.us 166 : 178 : appendstate->as_valid_subplans_identified = true;
167 : : }
168 : : }
169 : : else
170 : : {
2975 alvherre@alvh.no-ip. 171 : 12096 : nplans = list_length(node->appendplans);
172 : :
173 : : /*
174 : : * When run-time partition pruning is not enabled we can just mark all
175 : : * subplans as valid; they must also all be initialized.
176 : : */
2861 177 [ - + ]: 12096 : Assert(nplans > 0);
2975 178 : 12096 : appendstate->as_valid_subplans = validsubplans =
179 : 12096 : bms_add_range(NULL, 0, nplans - 1);
1185 tgl@sss.pgh.pa.us 180 : 12096 : appendstate->as_valid_subplans_identified = true;
2975 alvherre@alvh.no-ip. 181 : 12096 : appendstate->as_prune_state = NULL;
182 : : }
183 : :
184 : 12584 : appendplanstates = (PlanState **) palloc(nplans *
185 : : sizeof(PlanState *));
186 : :
187 : : /*
188 : : * call ExecInitNode on each of the valid plans to be executed and save
189 : : * the results into the appendplanstates array.
190 : : *
191 : : * While at it, find out the first valid partial plan.
192 : : */
2504 drowley@postgresql.o 193 : 12584 : j = 0;
1886 efujita@postgresql.o 194 : 12584 : asyncplans = NULL;
195 : 12584 : nasyncplans = 0;
2965 alvherre@alvh.no-ip. 196 : 12584 : firstvalid = nplans;
2504 drowley@postgresql.o 197 : 12584 : i = -1;
198 [ + + ]: 52305 : while ((i = bms_next_member(validsubplans, i)) >= 0)
199 : : {
200 : 39721 : Plan *initNode = (Plan *) list_nth(node->appendplans, i);
201 : :
202 : : /*
203 : : * Record async subplans. When executing EvalPlanQual, we treat them
204 : : * as sync ones; don't do this when initializing an EvalPlanQual plan
205 : : * tree.
206 : : */
1886 efujita@postgresql.o 207 [ + + + - ]: 39721 : if (initNode->async_capable && estate->es_epq_active == NULL)
208 : : {
209 : 93 : asyncplans = bms_add_member(asyncplans, j);
210 : 93 : nasyncplans++;
211 : : }
212 : :
213 : : /*
214 : : * Record the lowest appendplans index which is a valid partial plan.
215 : : */
2504 drowley@postgresql.o 216 [ + + + + ]: 39721 : if (i >= node->first_partial_plan && j < firstvalid)
217 : 335 : firstvalid = j;
218 : :
219 : 39721 : appendplanstates[j++] = ExecInitNode(initNode, estate, eflags);
220 : : }
221 : :
2965 alvherre@alvh.no-ip. 222 : 12584 : appendstate->as_first_partial_plan = firstvalid;
2975 223 : 12584 : appendstate->appendplans = appendplanstates;
224 : 12584 : appendstate->as_nplans = nplans;
225 : :
226 : : /*
227 : : * Initialize Append's result tuple type and slot. If the child plans all
228 : : * produce the same fixed slot type, we can use that slot type; otherwise
229 : : * make a virtual slot. (Note that the result slot itself is used only to
230 : : * return a null tuple at end of execution; real tuples are returned to
231 : : * the caller in the children's own result slots. What we are doing here
232 : : * is allowing the parent plan node to optimize if the Append will return
233 : : * only one kind of slot.)
234 : : */
527 tgl@sss.pgh.pa.us 235 : 12584 : appendops = ExecGetCommonSlotOps(appendplanstates, j);
236 [ + + ]: 12584 : if (appendops != NULL)
237 : : {
238 : 11956 : ExecInitResultTupleSlotTL(&appendstate->ps, appendops);
239 : : }
240 : : else
241 : : {
242 : 628 : ExecInitResultTupleSlotTL(&appendstate->ps, &TTSOpsVirtual);
243 : : /* show that the output slot type is not fixed */
244 : 628 : appendstate->ps.resultopsset = true;
245 : 628 : appendstate->ps.resultopsfixed = false;
246 : : }
247 : :
248 : : /* Initialize async state */
1886 efujita@postgresql.o 249 : 12584 : appendstate->as_asyncplans = asyncplans;
250 : 12584 : appendstate->as_nasyncplans = nasyncplans;
251 : 12584 : appendstate->as_asyncrequests = NULL;
1818 252 : 12584 : appendstate->as_asyncresults = NULL;
253 : 12584 : appendstate->as_nasyncresults = 0;
254 : 12584 : appendstate->as_nasyncremain = 0;
1886 255 : 12584 : appendstate->as_needrequest = NULL;
256 : 12584 : appendstate->as_eventset = NULL;
1818 257 : 12584 : appendstate->as_valid_asyncplans = NULL;
258 : :
1886 259 [ + + ]: 12584 : if (nasyncplans > 0)
260 : : {
261 : 47 : appendstate->as_asyncrequests = (AsyncRequest **)
262 : 47 : palloc0(nplans * sizeof(AsyncRequest *));
263 : :
264 : 47 : i = -1;
265 [ + + ]: 140 : while ((i = bms_next_member(asyncplans, i)) >= 0)
266 : : {
267 : : AsyncRequest *areq;
268 : :
171 michael@paquier.xyz 269 :GNC 93 : areq = palloc_object(AsyncRequest);
1886 efujita@postgresql.o 270 :CBC 93 : areq->requestor = (PlanState *) appendstate;
271 : 93 : areq->requestee = appendplanstates[i];
272 : 93 : areq->request_index = i;
273 : 93 : areq->callback_pending = false;
274 : 93 : areq->request_complete = false;
275 : 93 : areq->result = NULL;
276 : :
277 : 93 : appendstate->as_asyncrequests[i] = areq;
278 : : }
279 : :
1818 280 : 47 : appendstate->as_asyncresults = (TupleTableSlot **)
281 : 47 : palloc0(nasyncplans * sizeof(TupleTableSlot *));
282 : :
1185 tgl@sss.pgh.pa.us 283 [ + + ]: 47 : if (appendstate->as_valid_subplans_identified)
1818 efujita@postgresql.o 284 : 44 : classify_matching_subplans(appendstate);
285 : : }
286 : :
287 : : /*
288 : : * Miscellaneous initialization
289 : : */
290 : :
2975 alvherre@alvh.no-ip. 291 : 12584 : appendstate->ps.ps_ProjInfo = NULL;
292 : :
293 : : /* For parallel query, this will be overridden later. */
3098 rhaas@postgresql.org 294 : 12584 : appendstate->choose_next_subplan = choose_next_subplan_locally;
295 : :
8577 tgl@sss.pgh.pa.us 296 : 12584 : return appendstate;
297 : : }
298 : :
299 : : /* ----------------------------------------------------------------
300 : : * ExecAppend
301 : : *
302 : : * Handles iteration over multiple subplans.
303 : : * ----------------------------------------------------------------
304 : : */
305 : : static TupleTableSlot *
3239 andres@anarazel.de 306 : 1827841 : ExecAppend(PlanState *pstate)
307 : : {
308 : 1827841 : AppendState *node = castNode(AppendState, pstate);
309 : : TupleTableSlot *result;
310 : :
311 : : /*
312 : : * If this is the first call after Init or ReScan, we need to do the
313 : : * initialization work.
314 : : */
1886 efujita@postgresql.o 315 [ + + ]: 1827841 : if (!node->as_begun)
316 : : {
317 [ - + ]: 22206 : Assert(node->as_whichplan == INVALID_SUBPLAN_INDEX);
318 [ - + ]: 22206 : Assert(!node->as_syncdone);
319 : :
320 : : /* Nothing to do if there are no subplans */
2362 tgl@sss.pgh.pa.us 321 [ + + ]: 22206 : if (node->as_nplans == 0)
322 : 40 : return ExecClearTuple(node->ps.ps_ResultTupleSlot);
323 : :
324 : : /* If there are any async subplans, begin executing them. */
1886 efujita@postgresql.o 325 [ + + ]: 22166 : if (node->as_nasyncplans > 0)
326 : 37 : ExecAppendAsyncBegin(node);
327 : :
328 : : /*
329 : : * If no sync subplan has been chosen, we must choose one before
330 : : * proceeding.
331 : : */
332 [ + + + + ]: 22166 : if (!node->choose_next_subplan(node) && node->as_nasyncremain == 0)
2975 alvherre@alvh.no-ip. 333 : 2139 : return ExecClearTuple(node->ps.ps_ResultTupleSlot);
334 : :
1886 efujita@postgresql.o 335 [ + + + - : 20027 : Assert(node->as_syncdone ||
- + ]
336 : : (node->as_whichplan >= 0 &&
337 : : node->as_whichplan < node->as_nplans));
338 : :
339 : : /* And we're initialized. */
340 : 20027 : node->as_begun = true;
341 : : }
342 : :
343 : : for (;;)
10492 bruce@momjian.us 344 : 30212 : {
345 : : PlanState *subnode;
346 : :
3231 andres@anarazel.de 347 [ + + ]: 1855874 : CHECK_FOR_INTERRUPTS();
348 : :
349 : : /*
350 : : * try to get a tuple from an async subplan if any
351 : : */
1886 efujita@postgresql.o 352 [ + + - + ]: 1855874 : if (node->as_syncdone || !bms_is_empty(node->as_needrequest))
353 : : {
354 [ + - ]: 6138 : if (ExecAppendAsyncGetNext(node, &result))
355 : 6137 : return result;
1886 efujita@postgresql.o 356 [ # # ]:UBC 0 : Assert(!node->as_syncdone);
357 [ # # ]: 0 : Assert(bms_is_empty(node->as_needrequest));
358 : : }
359 : :
360 : : /*
361 : : * figure out which sync subplan we are currently processing
362 : : */
3098 rhaas@postgresql.org 363 [ + - - + ]:CBC 1849736 : Assert(node->as_whichplan >= 0 && node->as_whichplan < node->as_nplans);
7678 tgl@sss.pgh.pa.us 364 : 1849736 : subnode = node->appendplans[node->as_whichplan];
365 : :
366 : : /*
367 : : * get a tuple from the subplan
368 : : */
369 : 1849736 : result = ExecProcNode(subnode);
370 : :
371 [ + + + + ]: 1849702 : if (!TupIsNull(result))
372 : : {
373 : : /*
374 : : * If the subplan gave us something then return it as-is. We do
375 : : * NOT make use of the result slot that was set up in
376 : : * ExecInitAppend; there's no need for it.
377 : : */
378 : 1799879 : return result;
379 : : }
380 : :
381 : : /*
382 : : * wait or poll for async events if any. We do this before checking
383 : : * for the end of iteration, because it might drain the remaining
384 : : * async subplans.
385 : : */
1886 efujita@postgresql.o 386 [ + + ]: 49823 : if (node->as_nasyncremain > 0)
387 : 17 : ExecAppendAsyncEventWait(node);
388 : :
389 : : /* choose new sync subplan; if no sync/async subplans, we're done */
390 [ + + + + ]: 49823 : if (!node->choose_next_subplan(node) && node->as_nasyncremain == 0)
7678 tgl@sss.pgh.pa.us 391 : 19611 : return ExecClearTuple(node->ps.ps_ResultTupleSlot);
392 : : }
393 : : }
394 : :
395 : : /* ----------------------------------------------------------------
396 : : * ExecEndAppend
397 : : *
398 : : * Shuts down the subscans of the append node.
399 : : *
400 : : * Returns nothing of interest.
401 : : * ----------------------------------------------------------------
402 : : */
403 : : void
8577 404 : 12383 : ExecEndAppend(AppendState *node)
405 : : {
406 : : PlanState **appendplans;
407 : : int nplans;
408 : : int i;
409 : :
410 : : /*
411 : : * get information from the node
412 : : */
10181 bruce@momjian.us 413 : 12383 : appendplans = node->appendplans;
8577 tgl@sss.pgh.pa.us 414 : 12383 : nplans = node->as_nplans;
415 : :
416 : : /*
417 : : * shut down each of the subscans
418 : : */
10492 bruce@momjian.us 419 [ + + ]: 51624 : for (i = 0; i < nplans; i++)
6076 tgl@sss.pgh.pa.us 420 : 39241 : ExecEndNode(appendplans[i]);
10492 bruce@momjian.us 421 : 12383 : }
422 : :
423 : : void
5801 tgl@sss.pgh.pa.us 424 : 13379 : ExecReScanAppend(AppendState *node)
425 : : {
1886 efujita@postgresql.o 426 : 13379 : int nasyncplans = node->as_nasyncplans;
427 : : int i;
428 : :
429 : : /*
430 : : * If any PARAM_EXEC Params used in pruning expressions have changed, then
431 : : * we'd better unset the valid subplans so that they are reselected for
432 : : * the new parameter values.
433 : : */
2975 alvherre@alvh.no-ip. 434 [ + + + + ]: 15559 : if (node->as_prune_state &&
435 : 2180 : bms_overlap(node->ps.chgParam,
2911 tgl@sss.pgh.pa.us 436 : 2180 : node->as_prune_state->execparamids))
437 : : {
1185 438 : 2178 : node->as_valid_subplans_identified = false;
2975 alvherre@alvh.no-ip. 439 : 2178 : bms_free(node->as_valid_subplans);
440 : 2178 : node->as_valid_subplans = NULL;
1185 tgl@sss.pgh.pa.us 441 : 2178 : bms_free(node->as_valid_asyncplans);
442 : 2178 : node->as_valid_asyncplans = NULL;
443 : : }
444 : :
6076 445 [ + + ]: 55542 : for (i = 0; i < node->as_nplans; i++)
446 : : {
8335 bruce@momjian.us 447 : 42163 : PlanState *subnode = node->appendplans[i];
448 : :
449 : : /*
450 : : * ExecReScan doesn't know about my subplans, so I have to do
451 : : * changed-parameter signaling myself.
452 : : */
8511 tgl@sss.pgh.pa.us 453 [ + + ]: 42163 : if (node->ps.chgParam != NULL)
454 : 37423 : UpdateChangedParamSet(subnode, node->ps.chgParam);
455 : :
456 : : /*
457 : : * If chgParam of subnode is not null then plan will be re-scanned by
458 : : * first ExecProcNode or by first ExecAsyncRequest.
459 : : */
5801 460 [ + + ]: 42163 : if (subnode->chgParam == NULL)
461 : 12425 : ExecReScan(subnode);
462 : : }
463 : :
464 : : /* Reset async state */
1886 efujita@postgresql.o 465 [ + + ]: 13379 : if (nasyncplans > 0)
466 : : {
467 : 17 : i = -1;
468 [ + + ]: 51 : while ((i = bms_next_member(node->as_asyncplans, i)) >= 0)
469 : : {
470 : 34 : AsyncRequest *areq = node->as_asyncrequests[i];
471 : :
472 : 34 : areq->callback_pending = false;
473 : 34 : areq->request_complete = false;
474 : 34 : areq->result = NULL;
475 : : }
476 : :
1818 477 : 17 : node->as_nasyncresults = 0;
478 : 17 : node->as_nasyncremain = 0;
1886 479 : 17 : bms_free(node->as_needrequest);
480 : 17 : node->as_needrequest = NULL;
481 : : }
482 : :
483 : : /* Let choose_next_subplan_* function handle setting the first subplan */
2975 alvherre@alvh.no-ip. 484 : 13379 : node->as_whichplan = INVALID_SUBPLAN_INDEX;
1886 efujita@postgresql.o 485 : 13379 : node->as_syncdone = false;
486 : 13379 : node->as_begun = false;
3098 rhaas@postgresql.org 487 : 13379 : }
488 : :
489 : : /* ----------------------------------------------------------------
490 : : * Parallel Append Support
491 : : * ----------------------------------------------------------------
492 : : */
493 : :
494 : : /* ----------------------------------------------------------------
495 : : * ExecAppendEstimate
496 : : *
497 : : * Compute the amount of space we'll need in the parallel
498 : : * query DSM, and inform pcxt->estimator about our needs.
499 : : * ----------------------------------------------------------------
500 : : */
501 : : void
502 : 108 : ExecAppendEstimate(AppendState *node,
503 : : ParallelContext *pcxt)
504 : : {
505 : 108 : node->pstate_len =
506 : 108 : add_size(offsetof(ParallelAppendState, pa_finished),
507 : 108 : sizeof(bool) * node->as_nplans);
508 : :
509 : 108 : shm_toc_estimate_chunk(&pcxt->estimator, node->pstate_len);
510 : 108 : shm_toc_estimate_keys(&pcxt->estimator, 1);
511 : 108 : }
512 : :
513 : :
514 : : /* ----------------------------------------------------------------
515 : : * ExecAppendInitializeDSM
516 : : *
517 : : * Set up shared state for Parallel Append.
518 : : * ----------------------------------------------------------------
519 : : */
520 : : void
521 : 108 : ExecAppendInitializeDSM(AppendState *node,
522 : : ParallelContext *pcxt)
523 : : {
524 : : ParallelAppendState *pstate;
525 : :
526 : 108 : pstate = shm_toc_allocate(pcxt->toc, node->pstate_len);
527 : 108 : memset(pstate, 0, node->pstate_len);
528 : 108 : LWLockInitialize(&pstate->pa_lock, LWTRANCHE_PARALLEL_APPEND);
529 : 108 : shm_toc_insert(pcxt->toc, node->ps.plan->plan_node_id, pstate);
530 : :
531 : 108 : node->as_pstate = pstate;
532 : 108 : node->choose_next_subplan = choose_next_subplan_for_leader;
533 : 108 : }
534 : :
535 : : /* ----------------------------------------------------------------
536 : : * ExecAppendReInitializeDSM
537 : : *
538 : : * Reset shared state before beginning a fresh scan.
539 : : * ----------------------------------------------------------------
540 : : */
541 : : void
3098 rhaas@postgresql.org 542 :UBC 0 : ExecAppendReInitializeDSM(AppendState *node, ParallelContext *pcxt)
543 : : {
544 : 0 : ParallelAppendState *pstate = node->as_pstate;
545 : :
546 : 0 : pstate->pa_next_plan = 0;
547 : 0 : memset(pstate->pa_finished, 0, sizeof(bool) * node->as_nplans);
548 : 0 : }
549 : :
550 : : /* ----------------------------------------------------------------
551 : : * ExecAppendInitializeWorker
552 : : *
553 : : * Copy relevant information from TOC into planstate, and initialize
554 : : * whatever is required to choose and execute the optimal subplan.
555 : : * ----------------------------------------------------------------
556 : : */
557 : : void
3098 rhaas@postgresql.org 558 :CBC 243 : ExecAppendInitializeWorker(AppendState *node, ParallelWorkerContext *pwcxt)
559 : : {
560 : 243 : node->as_pstate = shm_toc_lookup(pwcxt->toc, node->ps.plan->plan_node_id, false);
561 : 243 : node->choose_next_subplan = choose_next_subplan_for_worker;
562 : 243 : }
563 : :
564 : : /* ----------------------------------------------------------------
565 : : * choose_next_subplan_locally
566 : : *
567 : : * Choose next sync subplan for a non-parallel-aware Append,
568 : : * returning false if there are no more.
569 : : * ----------------------------------------------------------------
570 : : */
571 : : static bool
572 : 70685 : choose_next_subplan_locally(AppendState *node)
573 : : {
574 : 70685 : int whichplan = node->as_whichplan;
575 : : int nextplan;
576 : :
577 : : /* We should never be called when there are no subplans */
2362 tgl@sss.pgh.pa.us 578 [ - + ]: 70685 : Assert(node->as_nplans > 0);
579 : :
580 : : /* Nothing to do if syncdone */
1886 efujita@postgresql.o 581 [ + + ]: 70685 : if (node->as_syncdone)
582 : 18 : return false;
583 : :
584 : : /*
585 : : * If first call then have the bms member function choose the first valid
586 : : * sync subplan by initializing whichplan to -1. If there happen to be no
587 : : * valid sync subplans then the bms member function will handle that by
588 : : * returning a negative number which will allow us to exit returning a
589 : : * false value.
590 : : */
2975 alvherre@alvh.no-ip. 591 [ + + ]: 70667 : if (whichplan == INVALID_SUBPLAN_INDEX)
592 : : {
1886 efujita@postgresql.o 593 [ + + ]: 21833 : if (node->as_nasyncplans > 0)
594 : : {
595 : : /* We'd have filled as_valid_subplans already */
1185 tgl@sss.pgh.pa.us 596 [ - + ]: 19 : Assert(node->as_valid_subplans_identified);
597 : : }
598 [ + + ]: 21814 : else if (!node->as_valid_subplans_identified)
599 : : {
2975 alvherre@alvh.no-ip. 600 : 2255 : node->as_valid_subplans =
477 amitlan@postgresql.o 601 : 2255 : ExecFindMatchingSubPlans(node->as_prune_state, false, NULL);
1185 tgl@sss.pgh.pa.us 602 : 2255 : node->as_valid_subplans_identified = true;
603 : : }
604 : :
2975 alvherre@alvh.no-ip. 605 : 21833 : whichplan = -1;
606 : : }
607 : :
608 : : /* Ensure whichplan is within the expected range */
609 [ + - - + ]: 70667 : Assert(whichplan >= -1 && whichplan <= node->as_nplans);
610 : :
611 [ + + ]: 70667 : if (ScanDirectionIsForward(node->ps.state->es_direction))
612 : 70655 : nextplan = bms_next_member(node->as_valid_subplans, whichplan);
613 : : else
614 : 12 : nextplan = bms_prev_member(node->as_valid_subplans, whichplan);
615 : :
616 [ + + ]: 70667 : if (nextplan < 0)
617 : : {
618 : : /* Set as_syncdone if in async mode */
1886 efujita@postgresql.o 619 [ + + ]: 21452 : if (node->as_nasyncplans > 0)
620 : 17 : node->as_syncdone = true;
2975 alvherre@alvh.no-ip. 621 : 21452 : return false;
622 : : }
623 : :
624 : 49215 : node->as_whichplan = nextplan;
625 : :
3098 rhaas@postgresql.org 626 : 49215 : return true;
627 : : }
628 : :
629 : : /* ----------------------------------------------------------------
630 : : * choose_next_subplan_for_leader
631 : : *
632 : : * Try to pick a plan which doesn't commit us to doing much
633 : : * work locally, so that as much work as possible is done in
634 : : * the workers. Cheapest subplans are at the end.
635 : : * ----------------------------------------------------------------
636 : : */
637 : : static bool
638 : 948 : choose_next_subplan_for_leader(AppendState *node)
639 : : {
640 : 948 : ParallelAppendState *pstate = node->as_pstate;
641 : :
642 : : /* Backward scan is not supported by parallel-aware plans */
643 [ - + ]: 948 : Assert(ScanDirectionIsForward(node->ps.state->es_direction));
644 : :
645 : : /* We should never be called when there are no subplans */
2362 tgl@sss.pgh.pa.us 646 [ - + ]: 948 : Assert(node->as_nplans > 0);
647 : :
3098 rhaas@postgresql.org 648 : 948 : LWLockAcquire(&pstate->pa_lock, LW_EXCLUSIVE);
649 : :
650 [ + + ]: 948 : if (node->as_whichplan != INVALID_SUBPLAN_INDEX)
651 : : {
652 : : /* Mark just-completed subplan as finished. */
653 : 852 : node->as_pstate->pa_finished[node->as_whichplan] = true;
654 : : }
655 : : else
656 : : {
657 : : /* Start with last subplan. */
658 : 96 : node->as_whichplan = node->as_nplans - 1;
659 : :
660 : : /*
661 : : * If we've yet to determine the valid subplans then do so now. If
662 : : * run-time pruning is disabled then the valid subplans will always be
663 : : * set to all subplans.
664 : : */
1185 tgl@sss.pgh.pa.us 665 [ + + ]: 96 : if (!node->as_valid_subplans_identified)
666 : : {
2975 alvherre@alvh.no-ip. 667 : 16 : node->as_valid_subplans =
477 amitlan@postgresql.o 668 : 16 : ExecFindMatchingSubPlans(node->as_prune_state, false, NULL);
1185 tgl@sss.pgh.pa.us 669 : 16 : node->as_valid_subplans_identified = true;
670 : :
671 : : /*
672 : : * Mark each invalid plan as finished to allow the loop below to
673 : : * select the first valid subplan.
674 : : */
2975 alvherre@alvh.no-ip. 675 : 16 : mark_invalid_subplans_as_finished(node);
676 : : }
677 : : }
678 : :
679 : : /* Loop until we find a subplan to execute. */
3098 rhaas@postgresql.org 680 [ + + ]: 1748 : while (pstate->pa_finished[node->as_whichplan])
681 : : {
682 [ + + ]: 896 : if (node->as_whichplan == 0)
683 : : {
684 : 96 : pstate->pa_next_plan = INVALID_SUBPLAN_INDEX;
685 : 96 : node->as_whichplan = INVALID_SUBPLAN_INDEX;
686 : 96 : LWLockRelease(&pstate->pa_lock);
687 : 96 : return false;
688 : : }
689 : :
690 : : /*
691 : : * We needn't pay attention to as_valid_subplans here as all invalid
692 : : * plans have been marked as finished.
693 : : */
694 : 800 : node->as_whichplan--;
695 : : }
696 : :
697 : : /* If non-partial, immediately mark as finished. */
2965 alvherre@alvh.no-ip. 698 [ + + ]: 852 : if (node->as_whichplan < node->as_first_partial_plan)
3098 rhaas@postgresql.org 699 : 112 : node->as_pstate->pa_finished[node->as_whichplan] = true;
700 : :
701 : 852 : LWLockRelease(&pstate->pa_lock);
702 : :
703 : 852 : return true;
704 : : }
705 : :
706 : : /* ----------------------------------------------------------------
707 : : * choose_next_subplan_for_worker
708 : : *
709 : : * Choose next subplan for a parallel-aware Append, returning
710 : : * false if there are no more.
711 : : *
712 : : * We start from the first plan and advance through the list;
713 : : * when we get back to the end, we loop back to the first
714 : : * partial plan. This assigns the non-partial plans first in
715 : : * order of descending cost and then spreads out the workers
716 : : * as evenly as possible across the remaining partial plans.
717 : : * ----------------------------------------------------------------
718 : : */
719 : : static bool
720 : 356 : choose_next_subplan_for_worker(AppendState *node)
721 : : {
722 : 356 : ParallelAppendState *pstate = node->as_pstate;
723 : :
724 : : /* Backward scan is not supported by parallel-aware plans */
725 [ - + ]: 356 : Assert(ScanDirectionIsForward(node->ps.state->es_direction));
726 : :
727 : : /* We should never be called when there are no subplans */
2362 tgl@sss.pgh.pa.us 728 [ - + ]: 356 : Assert(node->as_nplans > 0);
729 : :
3098 rhaas@postgresql.org 730 : 356 : LWLockAcquire(&pstate->pa_lock, LW_EXCLUSIVE);
731 : :
732 : : /* Mark just-completed subplan as finished. */
733 [ + + ]: 356 : if (node->as_whichplan != INVALID_SUBPLAN_INDEX)
734 : 137 : node->as_pstate->pa_finished[node->as_whichplan] = true;
735 : :
736 : : /*
737 : : * If we've yet to determine the valid subplans then do so now. If
738 : : * run-time pruning is disabled then the valid subplans will always be set
739 : : * to all subplans.
740 : : */
1185 tgl@sss.pgh.pa.us 741 [ + + ]: 219 : else if (!node->as_valid_subplans_identified)
742 : : {
2975 alvherre@alvh.no-ip. 743 : 16 : node->as_valid_subplans =
477 amitlan@postgresql.o 744 : 16 : ExecFindMatchingSubPlans(node->as_prune_state, false, NULL);
1185 tgl@sss.pgh.pa.us 745 : 16 : node->as_valid_subplans_identified = true;
746 : :
2975 alvherre@alvh.no-ip. 747 : 16 : mark_invalid_subplans_as_finished(node);
748 : : }
749 : :
750 : : /* If all the plans are already done, we have nothing to do */
3098 rhaas@postgresql.org 751 [ + + ]: 356 : if (pstate->pa_next_plan == INVALID_SUBPLAN_INDEX)
752 : : {
753 : 196 : LWLockRelease(&pstate->pa_lock);
754 : 196 : return false;
755 : : }
756 : :
757 : : /* Save the plan from which we are starting the search. */
3033 758 : 160 : node->as_whichplan = pstate->pa_next_plan;
759 : :
760 : : /* Loop until we find a valid subplan to execute. */
3098 761 [ + + ]: 267 : while (pstate->pa_finished[pstate->pa_next_plan])
762 : : {
763 : : int nextplan;
764 : :
2973 alvherre@alvh.no-ip. 765 : 130 : nextplan = bms_next_member(node->as_valid_subplans,
766 : : pstate->pa_next_plan);
767 [ + + ]: 130 : if (nextplan >= 0)
768 : : {
769 : : /* Advance to the next valid plan. */
770 : 90 : pstate->pa_next_plan = nextplan;
771 : : }
2965 772 [ + + ]: 40 : else if (node->as_whichplan > node->as_first_partial_plan)
773 : : {
774 : : /*
775 : : * Try looping back to the first valid partial plan, if there is
776 : : * one. If there isn't, arrange to bail out below.
777 : : */
2973 778 : 31 : nextplan = bms_next_member(node->as_valid_subplans,
2965 779 : 31 : node->as_first_partial_plan - 1);
2973 780 : 31 : pstate->pa_next_plan =
781 [ - + ]: 31 : nextplan < 0 ? node->as_whichplan : nextplan;
782 : : }
783 : : else
784 : : {
785 : : /*
786 : : * At last plan, and either there are no partial plans or we've
787 : : * tried them all. Arrange to bail out.
788 : : */
3098 rhaas@postgresql.org 789 : 9 : pstate->pa_next_plan = node->as_whichplan;
790 : : }
791 : :
792 [ + + ]: 130 : if (pstate->pa_next_plan == node->as_whichplan)
793 : : {
794 : : /* We've tried everything! */
795 : 23 : pstate->pa_next_plan = INVALID_SUBPLAN_INDEX;
796 : 23 : LWLockRelease(&pstate->pa_lock);
797 : 23 : return false;
798 : : }
799 : : }
800 : :
801 : : /* Pick the plan we found, and advance pa_next_plan one more time. */
2973 alvherre@alvh.no-ip. 802 : 137 : node->as_whichplan = pstate->pa_next_plan;
803 : 137 : pstate->pa_next_plan = bms_next_member(node->as_valid_subplans,
804 : : pstate->pa_next_plan);
805 : :
806 : : /*
807 : : * If there are no more valid plans then try setting the next plan to the
808 : : * first valid partial plan.
809 : : */
810 [ + + ]: 137 : if (pstate->pa_next_plan < 0)
811 : : {
812 : 20 : int nextplan = bms_next_member(node->as_valid_subplans,
2965 813 : 20 : node->as_first_partial_plan - 1);
814 : :
2973 815 [ + - ]: 20 : if (nextplan >= 0)
816 : 20 : pstate->pa_next_plan = nextplan;
817 : : else
818 : : {
819 : : /*
820 : : * There are no valid partial plans, and we already chose the last
821 : : * non-partial plan; so flag that there's nothing more for our
822 : : * fellow workers to do.
823 : : */
3097 rhaas@postgresql.org 824 :UBC 0 : pstate->pa_next_plan = INVALID_SUBPLAN_INDEX;
825 : : }
826 : : }
827 : :
828 : : /* If non-partial, immediately mark as finished. */
2965 alvherre@alvh.no-ip. 829 [ + + ]:CBC 137 : if (node->as_whichplan < node->as_first_partial_plan)
3098 rhaas@postgresql.org 830 : 28 : node->as_pstate->pa_finished[node->as_whichplan] = true;
831 : :
832 : 137 : LWLockRelease(&pstate->pa_lock);
833 : :
834 : 137 : return true;
835 : : }
836 : :
837 : : /*
838 : : * mark_invalid_subplans_as_finished
839 : : * Marks the ParallelAppendState's pa_finished as true for each invalid
840 : : * subplan.
841 : : *
842 : : * This function should only be called for parallel Append with run-time
843 : : * pruning enabled.
844 : : */
845 : : static void
2975 alvherre@alvh.no-ip. 846 : 32 : mark_invalid_subplans_as_finished(AppendState *node)
847 : : {
848 : : int i;
849 : :
850 : : /* Only valid to call this while in parallel Append mode */
851 [ - + ]: 32 : Assert(node->as_pstate);
852 : :
853 : : /* Shouldn't have been called when run-time pruning is not enabled */
854 [ - + ]: 32 : Assert(node->as_prune_state);
855 : :
856 : : /* Nothing to do if all plans are valid */
857 [ - + ]: 32 : if (bms_num_members(node->as_valid_subplans) == node->as_nplans)
2975 alvherre@alvh.no-ip. 858 :UBC 0 : return;
859 : :
860 : : /* Mark all non-valid plans as finished */
2975 alvherre@alvh.no-ip. 861 [ + + ]:CBC 108 : for (i = 0; i < node->as_nplans; i++)
862 : : {
863 [ + + ]: 76 : if (!bms_is_member(i, node->as_valid_subplans))
864 : 32 : node->as_pstate->pa_finished[i] = true;
865 : : }
866 : : }
867 : :
868 : : /* ----------------------------------------------------------------
869 : : * Asynchronous Append Support
870 : : * ----------------------------------------------------------------
871 : : */
872 : :
873 : : /* ----------------------------------------------------------------
874 : : * ExecAppendAsyncBegin
875 : : *
876 : : * Begin executing designed async-capable subplans.
877 : : * ----------------------------------------------------------------
878 : : */
879 : : static void
1886 efujita@postgresql.o 880 : 37 : ExecAppendAsyncBegin(AppendState *node)
881 : : {
882 : : int i;
883 : :
884 : : /* Backward scan is not supported by async-aware Appends. */
885 [ - + ]: 37 : Assert(ScanDirectionIsForward(node->ps.state->es_direction));
886 : :
887 : : /* We should never be called when there are no subplans */
1818 888 [ - + ]: 37 : Assert(node->as_nplans > 0);
889 : :
890 : : /* We should never be called when there are no async subplans. */
1886 891 [ - + ]: 37 : Assert(node->as_nasyncplans > 0);
892 : :
893 : : /* If we've yet to determine the valid subplans then do so now. */
1185 tgl@sss.pgh.pa.us 894 [ + + ]: 37 : if (!node->as_valid_subplans_identified)
895 : : {
1886 efujita@postgresql.o 896 : 2 : node->as_valid_subplans =
477 amitlan@postgresql.o 897 : 2 : ExecFindMatchingSubPlans(node->as_prune_state, false, NULL);
1185 tgl@sss.pgh.pa.us 898 : 2 : node->as_valid_subplans_identified = true;
899 : :
1818 efujita@postgresql.o 900 : 2 : classify_matching_subplans(node);
901 : : }
902 : :
903 : : /* Initialize state variables. */
904 : 37 : node->as_syncdone = bms_is_empty(node->as_valid_subplans);
905 : 37 : node->as_nasyncremain = bms_num_members(node->as_valid_asyncplans);
906 : :
907 : : /* Nothing to do if there are no valid async subplans. */
1886 908 [ - + ]: 37 : if (node->as_nasyncremain == 0)
1886 efujita@postgresql.o 909 :UBC 0 : return;
910 : :
911 : : /* Make a request for each of the valid async subplans. */
1886 efujita@postgresql.o 912 :CBC 37 : i = -1;
913 [ + + ]: 109 : while ((i = bms_next_member(node->as_valid_asyncplans, i)) >= 0)
914 : : {
915 : 72 : AsyncRequest *areq = node->as_asyncrequests[i];
916 : :
917 [ - + ]: 72 : Assert(areq->request_index == i);
918 [ - + ]: 72 : Assert(!areq->callback_pending);
919 : :
920 : : /* Do the actual work. */
921 : 72 : ExecAsyncRequest(areq);
922 : : }
923 : : }
924 : :
925 : : /* ----------------------------------------------------------------
926 : : * ExecAppendAsyncGetNext
927 : : *
928 : : * Get the next tuple from any of the asynchronous subplans.
929 : : * ----------------------------------------------------------------
930 : : */
931 : : static bool
932 : 6138 : ExecAppendAsyncGetNext(AppendState *node, TupleTableSlot **result)
933 : : {
934 : 6138 : *result = NULL;
935 : :
936 : : /* We should never be called when there are no valid async subplans. */
937 [ - + ]: 6138 : Assert(node->as_nasyncremain > 0);
938 : :
939 : : /* Request a tuple asynchronously. */
940 [ + + ]: 6138 : if (ExecAppendAsyncRequest(node, result))
941 : 6038 : return true;
942 : :
943 [ + + ]: 149 : while (node->as_nasyncremain > 0)
944 : : {
945 [ - + ]: 118 : CHECK_FOR_INTERRUPTS();
946 : :
947 : : /* Wait or poll for async events. */
948 : 118 : ExecAppendAsyncEventWait(node);
949 : :
950 : : /* Request a tuple asynchronously. */
951 [ + + ]: 117 : if (ExecAppendAsyncRequest(node, result))
952 : 68 : return true;
953 : :
954 : : /* Break from loop if there's any sync subplan that isn't complete. */
955 [ - + ]: 49 : if (!node->as_syncdone)
1886 efujita@postgresql.o 956 :UBC 0 : break;
957 : : }
958 : :
959 : : /*
960 : : * If all sync subplans are complete, we're totally done scanning the
961 : : * given node. Otherwise, we're done with the asynchronous stuff but must
962 : : * continue scanning the sync subplans.
963 : : */
1886 efujita@postgresql.o 964 [ + - ]:CBC 31 : if (node->as_syncdone)
965 : : {
966 [ - + ]: 31 : Assert(node->as_nasyncremain == 0);
967 : 31 : *result = ExecClearTuple(node->ps.ps_ResultTupleSlot);
968 : 31 : return true;
969 : : }
970 : :
1886 efujita@postgresql.o 971 :UBC 0 : return false;
972 : : }
973 : :
974 : : /* ----------------------------------------------------------------
975 : : * ExecAppendAsyncRequest
976 : : *
977 : : * Request a tuple asynchronously.
978 : : * ----------------------------------------------------------------
979 : : */
980 : : static bool
1886 efujita@postgresql.o 981 :CBC 6255 : ExecAppendAsyncRequest(AppendState *node, TupleTableSlot **result)
982 : : {
983 : : Bitmapset *needrequest;
984 : : int i;
985 : :
986 : : /* Nothing to do if there are no async subplans needing a new request. */
987 [ + + ]: 6255 : if (bms_is_empty(node->as_needrequest))
988 : : {
1863 989 [ - + ]: 72 : Assert(node->as_nasyncresults == 0);
1886 990 : 72 : return false;
991 : : }
992 : :
993 : : /*
994 : : * If there are any asynchronously-generated results that have not yet
995 : : * been returned, we have nothing to do; just return one of them.
996 : : */
997 [ + + ]: 6183 : if (node->as_nasyncresults > 0)
998 : : {
999 : 983 : --node->as_nasyncresults;
1000 : 983 : *result = node->as_asyncresults[node->as_nasyncresults];
1001 : 983 : return true;
1002 : : }
1003 : :
1004 : : /* Make a new request for each of the async subplans that need it. */
1005 : 5200 : needrequest = node->as_needrequest;
1006 : 5200 : node->as_needrequest = NULL;
1007 : 5200 : i = -1;
1008 [ + + ]: 11303 : while ((i = bms_next_member(needrequest, i)) >= 0)
1009 : : {
1010 : 6103 : AsyncRequest *areq = node->as_asyncrequests[i];
1011 : :
1012 : : /* Do the actual work. */
1013 : 6103 : ExecAsyncRequest(areq);
1014 : : }
1015 : 5200 : bms_free(needrequest);
1016 : :
1017 : : /* Return one of the asynchronously-generated results if any. */
1018 [ + + ]: 5200 : if (node->as_nasyncresults > 0)
1019 : : {
1020 : 5123 : --node->as_nasyncresults;
1021 : 5123 : *result = node->as_asyncresults[node->as_nasyncresults];
1022 : 5123 : return true;
1023 : : }
1024 : :
1025 : 77 : return false;
1026 : : }
1027 : :
1028 : : /* ----------------------------------------------------------------
1029 : : * ExecAppendAsyncEventWait
1030 : : *
1031 : : * Wait or poll for file descriptor events and fire callbacks.
1032 : : * ----------------------------------------------------------------
1033 : : */
1034 : : static void
1035 : 135 : ExecAppendAsyncEventWait(AppendState *node)
1036 : : {
444 heikki.linnakangas@i 1037 : 135 : int nevents = node->as_nasyncplans + 2;
1886 efujita@postgresql.o 1038 [ + + ]: 135 : long timeout = node->as_syncdone ? -1 : 0;
1039 : : WaitEvent occurred_event[EVENT_BUFFER_SIZE];
1040 : : int noccurred;
1041 : : int i;
1042 : :
1043 : : /* We should never be called when there are no valid async subplans. */
1044 [ - + ]: 135 : Assert(node->as_nasyncremain > 0);
1045 : :
919 heikki.linnakangas@i 1046 [ - + ]: 135 : Assert(node->as_eventset == NULL);
1047 : 135 : node->as_eventset = CreateWaitEventSet(CurrentResourceOwner, nevents);
1886 efujita@postgresql.o 1048 : 135 : AddWaitEventToSet(node->as_eventset, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET,
1049 : : NULL, NULL);
1050 : :
1051 : : /* Give each waiting subplan a chance to add an event. */
1052 : 135 : i = -1;
1053 [ + + ]: 411 : while ((i = bms_next_member(node->as_asyncplans, i)) >= 0)
1054 : : {
1055 : 277 : AsyncRequest *areq = node->as_asyncrequests[i];
1056 : :
1057 [ + + ]: 277 : if (areq->callback_pending)
1058 : 234 : ExecAsyncConfigureWait(areq);
1059 : : }
1060 : :
1061 : : /*
1062 : : * No need for further processing if none of the subplans configured any
1063 : : * events.
1064 : : */
1765 1065 [ + + ]: 134 : if (GetNumRegisteredWaitEvents(node->as_eventset) == 1)
1066 : : {
1067 : 1 : FreeWaitEventSet(node->as_eventset);
1068 : 1 : node->as_eventset = NULL;
1069 : 5 : return;
1070 : : }
1071 : :
1072 : : /*
1073 : : * Add the process latch to the set, so that we wake up to process the
1074 : : * standard interrupts with CHECK_FOR_INTERRUPTS().
1075 : : *
1076 : : * NOTE: For historical reasons, it's important that this is added to the
1077 : : * WaitEventSet after the ExecAsyncConfigureWait() calls. Namely,
1078 : : * postgres_fdw calls "GetNumRegisteredWaitEvents(set) == 1" to check if
1079 : : * any other events are in the set. That's a poor design, it's
1080 : : * questionable for postgres_fdw to be doing that in the first place, but
1081 : : * we cannot change it now. The pattern has possibly been copied to other
1082 : : * extensions too.
1083 : : */
444 heikki.linnakangas@i 1084 : 133 : AddWaitEventToSet(node->as_eventset, WL_LATCH_SET, PGINVALID_SOCKET,
1085 : : MyLatch, NULL);
1086 : :
1087 : : /* Return at most EVENT_BUFFER_SIZE events in one call. */
1863 efujita@postgresql.o 1088 [ - + ]: 133 : if (nevents > EVENT_BUFFER_SIZE)
1863 efujita@postgresql.o 1089 :UBC 0 : nevents = EVENT_BUFFER_SIZE;
1090 : :
1091 : : /*
1092 : : * If the timeout is -1, wait until at least one event occurs. If the
1093 : : * timeout is 0, poll for events, but do not wait at all.
1094 : : */
1886 efujita@postgresql.o 1095 :CBC 133 : noccurred = WaitEventSetWait(node->as_eventset, timeout, occurred_event,
1096 : : nevents, WAIT_EVENT_APPEND_READY);
1097 : 133 : FreeWaitEventSet(node->as_eventset);
1098 : 133 : node->as_eventset = NULL;
1099 [ + + ]: 133 : if (noccurred == 0)
1100 : 4 : return;
1101 : :
1102 : : /* Deliver notifications. */
1103 [ + + ]: 278 : for (i = 0; i < noccurred; i++)
1104 : : {
1105 : 149 : WaitEvent *w = &occurred_event[i];
1106 : :
1107 : : /*
1108 : : * Each waiting subplan should have registered its wait event with
1109 : : * user_data pointing back to its AsyncRequest.
1110 : : */
1111 [ + - ]: 149 : if ((w->events & WL_SOCKET_READABLE) != 0)
1112 : : {
1113 : 149 : AsyncRequest *areq = (AsyncRequest *) w->user_data;
1114 : :
1762 1115 [ + - ]: 149 : if (areq->callback_pending)
1116 : : {
1117 : : /*
1118 : : * Mark it as no longer needing a callback. We must do this
1119 : : * before dispatching the callback in case the callback resets
1120 : : * the flag.
1121 : : */
1122 : 149 : areq->callback_pending = false;
1123 : :
1124 : : /* Do the actual work. */
1125 : 149 : ExecAsyncNotify(areq);
1126 : : }
1127 : : }
1128 : :
1129 : : /* Handle standard interrupts */
444 heikki.linnakangas@i 1130 [ - + ]: 149 : if ((w->events & WL_LATCH_SET) != 0)
1131 : : {
444 heikki.linnakangas@i 1132 :UBC 0 : ResetLatch(MyLatch);
1133 [ # # ]: 0 : CHECK_FOR_INTERRUPTS();
1134 : : }
1135 : : }
1136 : : }
1137 : :
1138 : : /* ----------------------------------------------------------------
1139 : : * ExecAsyncAppendResponse
1140 : : *
1141 : : * Receive a response from an asynchronous request we made.
1142 : : * ----------------------------------------------------------------
1143 : : */
1144 : : void
1886 efujita@postgresql.o 1145 :CBC 6329 : ExecAsyncAppendResponse(AsyncRequest *areq)
1146 : : {
1147 : 6329 : AppendState *node = (AppendState *) areq->requestor;
1148 : 6329 : TupleTableSlot *slot = areq->result;
1149 : :
1150 : : /* The result should be a TupleTableSlot or NULL. */
1151 [ + + - + ]: 6329 : Assert(slot == NULL || IsA(slot, TupleTableSlot));
1152 : :
1153 : : /* Nothing to do if the request is pending. */
1154 [ + + ]: 6329 : if (!areq->request_complete)
1155 : : {
1156 : : /* The request would have been pending for a callback. */
1157 [ - + ]: 161 : Assert(areq->callback_pending);
1158 : 161 : return;
1159 : : }
1160 : :
1161 : : /* If the result is NULL or an empty slot, there's nothing more to do. */
1162 [ + + - + ]: 6168 : if (TupIsNull(slot))
1163 : : {
1164 : : /* The ending subplan wouldn't have been pending for a callback. */
1165 [ - + ]: 61 : Assert(!areq->callback_pending);
1166 : 61 : --node->as_nasyncremain;
1167 : 61 : return;
1168 : : }
1169 : :
1170 : : /* Save result so we can return it. */
1171 [ - + ]: 6107 : Assert(node->as_nasyncresults < node->as_nasyncplans);
1172 : 6107 : node->as_asyncresults[node->as_nasyncresults++] = slot;
1173 : :
1174 : : /*
1175 : : * Mark the subplan that returned a result as ready for a new request. We
1176 : : * don't launch another one here immediately because it might complete.
1177 : : */
1178 : 6107 : node->as_needrequest = bms_add_member(node->as_needrequest,
1179 : : areq->request_index);
1180 : : }
1181 : :
1182 : : /* ----------------------------------------------------------------
1183 : : * classify_matching_subplans
1184 : : *
1185 : : * Classify the node's as_valid_subplans into sync ones and
1186 : : * async ones, adjust it to contain sync ones only, and save
1187 : : * async ones in the node's as_valid_asyncplans.
1188 : : * ----------------------------------------------------------------
1189 : : */
1190 : : static void
1191 : 46 : classify_matching_subplans(AppendState *node)
1192 : : {
1193 : : Bitmapset *valid_asyncplans;
1194 : :
1185 tgl@sss.pgh.pa.us 1195 [ - + ]: 46 : Assert(node->as_valid_subplans_identified);
1886 efujita@postgresql.o 1196 [ - + ]: 46 : Assert(node->as_valid_asyncplans == NULL);
1197 : :
1198 : : /* Nothing to do if there are no valid subplans. */
1199 [ - + ]: 46 : if (bms_is_empty(node->as_valid_subplans))
1200 : : {
1886 efujita@postgresql.o 1201 :UBC 0 : node->as_syncdone = true;
1202 : 0 : node->as_nasyncremain = 0;
1203 : 0 : return;
1204 : : }
1205 : :
1206 : : /* Nothing to do if there are no valid async subplans. */
1886 efujita@postgresql.o 1207 [ - + ]:CBC 46 : if (!bms_overlap(node->as_valid_subplans, node->as_asyncplans))
1208 : : {
1886 efujita@postgresql.o 1209 :UBC 0 : node->as_nasyncremain = 0;
1210 : 0 : return;
1211 : : }
1212 : :
1213 : : /* Get valid async subplans. */
1185 tgl@sss.pgh.pa.us 1214 :CBC 46 : valid_asyncplans = bms_intersect(node->as_asyncplans,
1215 : 46 : node->as_valid_subplans);
1216 : :
1217 : : /* Adjust the valid subplans to contain sync subplans only. */
1886 efujita@postgresql.o 1218 : 46 : node->as_valid_subplans = bms_del_members(node->as_valid_subplans,
1219 : : valid_asyncplans);
1220 : :
1221 : : /* Save valid async subplans. */
1222 : 46 : node->as_valid_asyncplans = valid_asyncplans;
1223 : : }
|