Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * nodeAppend.c
4 : : * routines to handle append nodes.
5 : : *
6 : : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
7 : : * Portions Copyright (c) 1994, Regents of the University of California
8 : : *
9 : : *
10 : : * IDENTIFICATION
11 : : * src/backend/executor/nodeAppend.c
12 : : *
13 : : *-------------------------------------------------------------------------
14 : : */
15 : : /* INTERFACE ROUTINES
16 : : * ExecInitAppend - initialize the append node
17 : : * ExecAppend - retrieve the next tuple from the node
18 : : * ExecEndAppend - shut down the append node
19 : : * ExecReScanAppend - rescan the append node
20 : : *
21 : : * NOTES
22 : : * Each append node contains a list of one or more subplans which
23 : : * must be iteratively processed (forwards or backwards).
24 : : * Tuples are retrieved by executing the 'whichplan'th subplan
25 : : * until the subplan stops returning tuples, at which point that
26 : : * plan is shut down and the next started up.
27 : : *
28 : : * Append nodes don't make use of their left and right
29 : : * subtrees, rather they maintain a list of subplans so
30 : : * a typical append node looks like this in the plan tree:
31 : : *
32 : : * ...
33 : : * /
34 : : * Append -------+------+------+--- nil
35 : : * / \ | | |
36 : : * nil nil ... ... ...
37 : : * subplans
38 : : *
39 : : * Append nodes are currently used for unions, and to support
40 : : * inheritance queries, where several relations need to be scanned.
41 : : * For example, in our standard person/student/employee/student-emp
42 : : * example, where student and employee inherit from person
43 : : * and student-emp inherits from student and employee, the
44 : : * query:
45 : : *
46 : : * select name from person
47 : : *
48 : : * generates the plan:
49 : : *
50 : : * |
51 : : * Append -------+-------+--------+--------+
52 : : * / \ | | | |
53 : : * nil nil Scan Scan Scan Scan
54 : : * | | | |
55 : : * person employee student student-emp
56 : : */
57 : :
58 : : #include "postgres.h"
59 : :
60 : : #include "executor/execAsync.h"
61 : : #include "executor/execPartition.h"
62 : : #include "executor/executor.h"
63 : : #include "executor/nodeAppend.h"
64 : : #include "miscadmin.h"
65 : : #include "pgstat.h"
66 : : #include "storage/latch.h"
67 : : #include "storage/lwlock.h"
68 : : #include "utils/wait_event.h"
69 : :
70 : : /* Shared state for parallel-aware Append. */
71 : : struct ParallelAppendState
72 : : {
73 : : LWLock pa_lock; /* mutual exclusion to choose next subplan */
74 : : int pa_next_plan; /* next plan to choose by any worker */
75 : :
76 : : /*
77 : : * pa_finished[i] should be true if no more workers should select subplan
78 : : * i. for a non-partial plan, this should be set to true as soon as a
79 : : * worker selects the plan; for a partial plan, it remains false until
80 : : * some worker executes the plan to completion.
81 : : */
82 : : bool pa_finished[FLEXIBLE_ARRAY_MEMBER];
83 : : };
84 : :
85 : : #define INVALID_SUBPLAN_INDEX -1
86 : : #define EVENT_BUFFER_SIZE 16
87 : :
88 : : static TupleTableSlot *ExecAppend(PlanState *pstate);
89 : : static bool choose_next_subplan_locally(AppendState *node);
90 : : static bool choose_next_subplan_for_leader(AppendState *node);
91 : : static bool choose_next_subplan_for_worker(AppendState *node);
92 : : static void mark_invalid_subplans_as_finished(AppendState *node);
93 : : static void ExecAppendAsyncBegin(AppendState *node);
94 : : static bool ExecAppendAsyncGetNext(AppendState *node, TupleTableSlot **result);
95 : : static bool ExecAppendAsyncRequest(AppendState *node, TupleTableSlot **result);
96 : : static void ExecAppendAsyncEventWait(AppendState *node);
97 : : static void classify_matching_subplans(AppendState *node);
98 : :
99 : : /* ----------------------------------------------------------------
100 : : * ExecInitAppend
101 : : *
102 : : * Begin all of the subscans of the append node.
103 : : *
104 : : * (This is potentially wasteful, since the entire result of the
105 : : * append node may not be scanned, but this way all of the
106 : : * structures get allocated in the executor's top level memory
107 : : * block instead of that of the call to ExecAppend.)
108 : : * ----------------------------------------------------------------
109 : : */
110 : : AppendState *
7320 tgl@sss.pgh.pa.us 111 :CBC 9528 : ExecInitAppend(Append *node, EState *estate, int eflags)
112 : : {
8501 113 : 9528 : AppendState *appendstate = makeNode(AppendState);
114 : : PlanState **appendplanstates;
115 : : const TupleTableSlotOps *appendops;
116 : : Bitmapset *validsubplans;
117 : : Bitmapset *asyncplans;
118 : : int nplans;
119 : : int nasyncplans;
120 : : int firstvalid;
121 : : int i,
122 : : j;
123 : :
124 : : /* check for unsupported flags */
7320 125 [ - + ]: 9528 : Assert(!(eflags & EXEC_FLAG_MARK));
126 : :
127 : : /*
128 : : * create new AppendState for our append node
129 : : */
8501 130 : 9528 : appendstate->ps.plan = (Plan *) node;
131 : 9528 : appendstate->ps.state = estate;
3163 andres@anarazel.de 132 : 9528 : appendstate->ps.ExecProcNode = ExecAppend;
133 : :
134 : : /* Let choose_next_subplan_* function handle setting the first subplan */
2899 alvherre@alvh.no-ip. 135 : 9528 : appendstate->as_whichplan = INVALID_SUBPLAN_INDEX;
1810 efujita@postgresql.o 136 : 9528 : appendstate->as_syncdone = false;
137 : 9528 : appendstate->as_begun = false;
138 : :
139 : : /* If run-time partition pruning is enabled, then set that up now */
409 amitlan@postgresql.o 140 [ + + ]: 9528 : if (node->part_prune_index >= 0)
141 : : {
142 : : PartitionPruneState *prunestate;
143 : :
144 : : /*
145 : : * Set up pruning data structure. This also initializes the set of
146 : : * subplans to initialize (validsubplans) by taking into account the
147 : : * result of performing initial pruning if any.
148 : : */
408 149 : 372 : prunestate = ExecInitPartitionExecPruning(&appendstate->ps,
150 : 372 : list_length(node->appendplans),
151 : : node->part_prune_index,
152 : : node->apprelids,
153 : : &validsubplans);
2832 tgl@sss.pgh.pa.us 154 : 372 : appendstate->as_prune_state = prunestate;
1440 alvherre@alvh.no-ip. 155 : 372 : nplans = bms_num_members(validsubplans);
156 : :
157 : : /*
158 : : * When no run-time pruning is required and there's at least one
159 : : * subplan, we can fill as_valid_subplans immediately, preventing
160 : : * later calls to ExecFindMatchingSubPlans.
161 : : */
2286 tgl@sss.pgh.pa.us 162 [ + + + + ]: 372 : if (!prunestate->do_exec_prune && nplans > 0)
163 : : {
2899 alvherre@alvh.no-ip. 164 : 135 : appendstate->as_valid_subplans = bms_add_range(NULL, 0, nplans - 1);
1109 tgl@sss.pgh.pa.us 165 : 135 : appendstate->as_valid_subplans_identified = true;
166 : : }
167 : : }
168 : : else
169 : : {
2899 alvherre@alvh.no-ip. 170 : 9156 : nplans = list_length(node->appendplans);
171 : :
172 : : /*
173 : : * When run-time partition pruning is not enabled we can just mark all
174 : : * subplans as valid; they must also all be initialized.
175 : : */
2785 176 [ - + ]: 9156 : Assert(nplans > 0);
2899 177 : 9156 : appendstate->as_valid_subplans = validsubplans =
178 : 9156 : bms_add_range(NULL, 0, nplans - 1);
1109 tgl@sss.pgh.pa.us 179 : 9156 : appendstate->as_valid_subplans_identified = true;
2899 alvherre@alvh.no-ip. 180 : 9156 : appendstate->as_prune_state = NULL;
181 : : }
182 : :
183 : 9528 : appendplanstates = (PlanState **) palloc(nplans *
184 : : sizeof(PlanState *));
185 : :
186 : : /*
187 : : * call ExecInitNode on each of the valid plans to be executed and save
188 : : * the results into the appendplanstates array.
189 : : *
190 : : * While at it, find out the first valid partial plan.
191 : : */
2428 drowley@postgresql.o 192 : 9528 : j = 0;
1810 efujita@postgresql.o 193 : 9528 : asyncplans = NULL;
194 : 9528 : nasyncplans = 0;
2889 alvherre@alvh.no-ip. 195 : 9528 : firstvalid = nplans;
2428 drowley@postgresql.o 196 : 9528 : i = -1;
197 [ + + ]: 38696 : while ((i = bms_next_member(validsubplans, i)) >= 0)
198 : : {
199 : 29168 : Plan *initNode = (Plan *) list_nth(node->appendplans, i);
200 : :
201 : : /*
202 : : * Record async subplans. When executing EvalPlanQual, we treat them
203 : : * as sync ones; don't do this when initializing an EvalPlanQual plan
204 : : * tree.
205 : : */
1810 efujita@postgresql.o 206 [ + + + - ]: 29168 : if (initNode->async_capable && estate->es_epq_active == NULL)
207 : : {
208 : 93 : asyncplans = bms_add_member(asyncplans, j);
209 : 93 : nasyncplans++;
210 : : }
211 : :
212 : : /*
213 : : * Record the lowest appendplans index which is a valid partial plan.
214 : : */
2428 drowley@postgresql.o 215 [ + + + + ]: 29168 : if (i >= node->first_partial_plan && j < firstvalid)
216 : 225 : firstvalid = j;
217 : :
218 : 29168 : appendplanstates[j++] = ExecInitNode(initNode, estate, eflags);
219 : : }
220 : :
2889 alvherre@alvh.no-ip. 221 : 9528 : appendstate->as_first_partial_plan = firstvalid;
2899 222 : 9528 : appendstate->appendplans = appendplanstates;
223 : 9528 : appendstate->as_nplans = nplans;
224 : :
225 : : /*
226 : : * Initialize Append's result tuple type and slot. If the child plans all
227 : : * produce the same fixed slot type, we can use that slot type; otherwise
228 : : * make a virtual slot. (Note that the result slot itself is used only to
229 : : * return a null tuple at end of execution; real tuples are returned to
230 : : * the caller in the children's own result slots. What we are doing here
231 : : * is allowing the parent plan node to optimize if the Append will return
232 : : * only one kind of slot.)
233 : : */
451 tgl@sss.pgh.pa.us 234 : 9528 : appendops = ExecGetCommonSlotOps(appendplanstates, j);
235 [ + + ]: 9528 : if (appendops != NULL)
236 : : {
237 : 9020 : ExecInitResultTupleSlotTL(&appendstate->ps, appendops);
238 : : }
239 : : else
240 : : {
241 : 508 : ExecInitResultTupleSlotTL(&appendstate->ps, &TTSOpsVirtual);
242 : : /* show that the output slot type is not fixed */
243 : 508 : appendstate->ps.resultopsset = true;
244 : 508 : appendstate->ps.resultopsfixed = false;
245 : : }
246 : :
247 : : /* Initialize async state */
1810 efujita@postgresql.o 248 : 9528 : appendstate->as_asyncplans = asyncplans;
249 : 9528 : appendstate->as_nasyncplans = nasyncplans;
250 : 9528 : appendstate->as_asyncrequests = NULL;
1742 251 : 9528 : appendstate->as_asyncresults = NULL;
252 : 9528 : appendstate->as_nasyncresults = 0;
253 : 9528 : appendstate->as_nasyncremain = 0;
1810 254 : 9528 : appendstate->as_needrequest = NULL;
255 : 9528 : appendstate->as_eventset = NULL;
1742 256 : 9528 : appendstate->as_valid_asyncplans = NULL;
257 : :
1810 258 [ + + ]: 9528 : if (nasyncplans > 0)
259 : : {
260 : 47 : appendstate->as_asyncrequests = (AsyncRequest **)
261 : 47 : palloc0(nplans * sizeof(AsyncRequest *));
262 : :
263 : 47 : i = -1;
264 [ + + ]: 140 : while ((i = bms_next_member(asyncplans, i)) >= 0)
265 : : {
266 : : AsyncRequest *areq;
267 : :
95 michael@paquier.xyz 268 :GNC 93 : areq = palloc_object(AsyncRequest);
1810 efujita@postgresql.o 269 :CBC 93 : areq->requestor = (PlanState *) appendstate;
270 : 93 : areq->requestee = appendplanstates[i];
271 : 93 : areq->request_index = i;
272 : 93 : areq->callback_pending = false;
273 : 93 : areq->request_complete = false;
274 : 93 : areq->result = NULL;
275 : :
276 : 93 : appendstate->as_asyncrequests[i] = areq;
277 : : }
278 : :
1742 279 : 47 : appendstate->as_asyncresults = (TupleTableSlot **)
280 : 47 : palloc0(nasyncplans * sizeof(TupleTableSlot *));
281 : :
1109 tgl@sss.pgh.pa.us 282 [ + + ]: 47 : if (appendstate->as_valid_subplans_identified)
1742 efujita@postgresql.o 283 : 44 : classify_matching_subplans(appendstate);
284 : : }
285 : :
286 : : /*
287 : : * Miscellaneous initialization
288 : : */
289 : :
2899 alvherre@alvh.no-ip. 290 : 9528 : appendstate->ps.ps_ProjInfo = NULL;
291 : :
292 : : /* For parallel query, this will be overridden later. */
3022 rhaas@postgresql.org 293 : 9528 : appendstate->choose_next_subplan = choose_next_subplan_locally;
294 : :
8501 tgl@sss.pgh.pa.us 295 : 9528 : return appendstate;
296 : : }
297 : :
298 : : /* ----------------------------------------------------------------
299 : : * ExecAppend
300 : : *
301 : : * Handles iteration over multiple subplans.
302 : : * ----------------------------------------------------------------
303 : : */
304 : : static TupleTableSlot *
3163 andres@anarazel.de 305 : 1470386 : ExecAppend(PlanState *pstate)
306 : : {
307 : 1470386 : AppendState *node = castNode(AppendState, pstate);
308 : : TupleTableSlot *result;
309 : :
310 : : /*
311 : : * If this is the first call after Init or ReScan, we need to do the
312 : : * initialization work.
313 : : */
1810 efujita@postgresql.o 314 [ + + ]: 1470386 : if (!node->as_begun)
315 : : {
316 [ - + ]: 16597 : Assert(node->as_whichplan == INVALID_SUBPLAN_INDEX);
317 [ - + ]: 16597 : Assert(!node->as_syncdone);
318 : :
319 : : /* Nothing to do if there are no subplans */
2286 tgl@sss.pgh.pa.us 320 [ + + ]: 16597 : if (node->as_nplans == 0)
321 : 30 : return ExecClearTuple(node->ps.ps_ResultTupleSlot);
322 : :
323 : : /* If there are any async subplans, begin executing them. */
1810 efujita@postgresql.o 324 [ + + ]: 16567 : if (node->as_nasyncplans > 0)
325 : 37 : ExecAppendAsyncBegin(node);
326 : :
327 : : /*
328 : : * If no sync subplan has been chosen, we must choose one before
329 : : * proceeding.
330 : : */
331 [ + + + + ]: 16567 : if (!node->choose_next_subplan(node) && node->as_nasyncremain == 0)
2899 alvherre@alvh.no-ip. 332 : 1579 : return ExecClearTuple(node->ps.ps_ResultTupleSlot);
333 : :
1810 efujita@postgresql.o 334 [ + + + - : 14988 : Assert(node->as_syncdone ||
- + ]
335 : : (node->as_whichplan >= 0 &&
336 : : node->as_whichplan < node->as_nplans));
337 : :
338 : : /* And we're initialized. */
339 : 14988 : node->as_begun = true;
340 : : }
341 : :
342 : : for (;;)
10416 bruce@momjian.us 343 : 22774 : {
344 : : PlanState *subnode;
345 : :
3155 andres@anarazel.de 346 [ + + ]: 1491551 : CHECK_FOR_INTERRUPTS();
347 : :
348 : : /*
349 : : * try to get a tuple from an async subplan if any
350 : : */
1810 efujita@postgresql.o 351 [ + + - + ]: 1491551 : if (node->as_syncdone || !bms_is_empty(node->as_needrequest))
352 : : {
353 [ + - ]: 6038 : if (ExecAppendAsyncGetNext(node, &result))
354 : 6037 : return result;
1810 efujita@postgresql.o 355 [ # # ]:UBC 0 : Assert(!node->as_syncdone);
356 [ # # ]: 0 : Assert(bms_is_empty(node->as_needrequest));
357 : : }
358 : :
359 : : /*
360 : : * figure out which sync subplan we are currently processing
361 : : */
3022 rhaas@postgresql.org 362 [ + - - + ]:CBC 1485513 : Assert(node->as_whichplan >= 0 && node->as_whichplan < node->as_nplans);
7602 tgl@sss.pgh.pa.us 363 : 1485513 : subnode = node->appendplans[node->as_whichplan];
364 : :
365 : : /*
366 : : * get a tuple from the subplan
367 : : */
368 : 1485513 : result = ExecProcNode(subnode);
369 : :
370 [ + + + + ]: 1485487 : if (!TupIsNull(result))
371 : : {
372 : : /*
373 : : * If the subplan gave us something then return it as-is. We do
374 : : * NOT make use of the result slot that was set up in
375 : : * ExecInitAppend; there's no need for it.
376 : : */
377 : 1448057 : return result;
378 : : }
379 : :
380 : : /*
381 : : * wait or poll for async events if any. We do this before checking
382 : : * for the end of iteration, because it might drain the remaining
383 : : * async subplans.
384 : : */
1810 efujita@postgresql.o 385 [ + + ]: 37430 : if (node->as_nasyncremain > 0)
386 : 17 : ExecAppendAsyncEventWait(node);
387 : :
388 : : /* choose new sync subplan; if no sync/async subplans, we're done */
389 [ + + + + ]: 37430 : if (!node->choose_next_subplan(node) && node->as_nasyncremain == 0)
7602 tgl@sss.pgh.pa.us 390 : 14656 : return ExecClearTuple(node->ps.ps_ResultTupleSlot);
391 : : }
392 : : }
393 : :
394 : : /* ----------------------------------------------------------------
395 : : * ExecEndAppend
396 : : *
397 : : * Shuts down the subscans of the append node.
398 : : *
399 : : * Returns nothing of interest.
400 : : * ----------------------------------------------------------------
401 : : */
402 : : void
8501 403 : 9374 : ExecEndAppend(AppendState *node)
404 : : {
405 : : PlanState **appendplans;
406 : : int nplans;
407 : : int i;
408 : :
409 : : /*
410 : : * get information from the node
411 : : */
10105 bruce@momjian.us 412 : 9374 : appendplans = node->appendplans;
8501 tgl@sss.pgh.pa.us 413 : 9374 : nplans = node->as_nplans;
414 : :
415 : : /*
416 : : * shut down each of the subscans
417 : : */
10416 bruce@momjian.us 418 [ + + ]: 38174 : for (i = 0; i < nplans; i++)
6000 tgl@sss.pgh.pa.us 419 : 28800 : ExecEndNode(appendplans[i]);
10416 bruce@momjian.us 420 : 9374 : }
421 : :
422 : : void
5725 tgl@sss.pgh.pa.us 423 : 10057 : ExecReScanAppend(AppendState *node)
424 : : {
1810 efujita@postgresql.o 425 : 10057 : int nasyncplans = node->as_nasyncplans;
426 : : int i;
427 : :
428 : : /*
429 : : * If any PARAM_EXEC Params used in pruning expressions have changed, then
430 : : * we'd better unset the valid subplans so that they are reselected for
431 : : * the new parameter values.
432 : : */
2899 alvherre@alvh.no-ip. 433 [ + + + + ]: 11693 : if (node->as_prune_state &&
434 : 1636 : bms_overlap(node->ps.chgParam,
2835 tgl@sss.pgh.pa.us 435 : 1636 : node->as_prune_state->execparamids))
436 : : {
1109 437 : 1634 : node->as_valid_subplans_identified = false;
2899 alvherre@alvh.no-ip. 438 : 1634 : bms_free(node->as_valid_subplans);
439 : 1634 : node->as_valid_subplans = NULL;
1109 tgl@sss.pgh.pa.us 440 : 1634 : bms_free(node->as_valid_asyncplans);
441 : 1634 : node->as_valid_asyncplans = NULL;
442 : : }
443 : :
6000 444 [ + + ]: 41847 : for (i = 0; i < node->as_nplans; i++)
445 : : {
8259 bruce@momjian.us 446 : 31790 : PlanState *subnode = node->appendplans[i];
447 : :
448 : : /*
449 : : * ExecReScan doesn't know about my subplans, so I have to do
450 : : * changed-parameter signaling myself.
451 : : */
8435 tgl@sss.pgh.pa.us 452 [ + + ]: 31790 : if (node->ps.chgParam != NULL)
453 : 29673 : UpdateChangedParamSet(subnode, node->ps.chgParam);
454 : :
455 : : /*
456 : : * If chgParam of subnode is not null then plan will be re-scanned by
457 : : * first ExecProcNode or by first ExecAsyncRequest.
458 : : */
5725 459 [ + + ]: 31790 : if (subnode->chgParam == NULL)
460 : 8566 : ExecReScan(subnode);
461 : : }
462 : :
463 : : /* Reset async state */
1810 efujita@postgresql.o 464 [ + + ]: 10057 : if (nasyncplans > 0)
465 : : {
466 : 17 : i = -1;
467 [ + + ]: 51 : while ((i = bms_next_member(node->as_asyncplans, i)) >= 0)
468 : : {
469 : 34 : AsyncRequest *areq = node->as_asyncrequests[i];
470 : :
471 : 34 : areq->callback_pending = false;
472 : 34 : areq->request_complete = false;
473 : 34 : areq->result = NULL;
474 : : }
475 : :
1742 476 : 17 : node->as_nasyncresults = 0;
477 : 17 : node->as_nasyncremain = 0;
1810 478 : 17 : bms_free(node->as_needrequest);
479 : 17 : node->as_needrequest = NULL;
480 : : }
481 : :
482 : : /* Let choose_next_subplan_* function handle setting the first subplan */
2899 alvherre@alvh.no-ip. 483 : 10057 : node->as_whichplan = INVALID_SUBPLAN_INDEX;
1810 efujita@postgresql.o 484 : 10057 : node->as_syncdone = false;
485 : 10057 : node->as_begun = false;
3022 rhaas@postgresql.org 486 : 10057 : }
487 : :
488 : : /* ----------------------------------------------------------------
489 : : * Parallel Append Support
490 : : * ----------------------------------------------------------------
491 : : */
492 : :
493 : : /* ----------------------------------------------------------------
494 : : * ExecAppendEstimate
495 : : *
496 : : * Compute the amount of space we'll need in the parallel
497 : : * query DSM, and inform pcxt->estimator about our needs.
498 : : * ----------------------------------------------------------------
499 : : */
500 : : void
501 : 69 : ExecAppendEstimate(AppendState *node,
502 : : ParallelContext *pcxt)
503 : : {
504 : 69 : node->pstate_len =
505 : 69 : add_size(offsetof(ParallelAppendState, pa_finished),
506 : 69 : sizeof(bool) * node->as_nplans);
507 : :
508 : 69 : shm_toc_estimate_chunk(&pcxt->estimator, node->pstate_len);
509 : 69 : shm_toc_estimate_keys(&pcxt->estimator, 1);
510 : 69 : }
511 : :
512 : :
513 : : /* ----------------------------------------------------------------
514 : : * ExecAppendInitializeDSM
515 : : *
516 : : * Set up shared state for Parallel Append.
517 : : * ----------------------------------------------------------------
518 : : */
519 : : void
520 : 69 : ExecAppendInitializeDSM(AppendState *node,
521 : : ParallelContext *pcxt)
522 : : {
523 : : ParallelAppendState *pstate;
524 : :
525 : 69 : pstate = shm_toc_allocate(pcxt->toc, node->pstate_len);
526 : 69 : memset(pstate, 0, node->pstate_len);
527 : 69 : LWLockInitialize(&pstate->pa_lock, LWTRANCHE_PARALLEL_APPEND);
528 : 69 : shm_toc_insert(pcxt->toc, node->ps.plan->plan_node_id, pstate);
529 : :
530 : 69 : node->as_pstate = pstate;
531 : 69 : node->choose_next_subplan = choose_next_subplan_for_leader;
532 : 69 : }
533 : :
534 : : /* ----------------------------------------------------------------
535 : : * ExecAppendReInitializeDSM
536 : : *
537 : : * Reset shared state before beginning a fresh scan.
538 : : * ----------------------------------------------------------------
539 : : */
540 : : void
3022 rhaas@postgresql.org 541 :UBC 0 : ExecAppendReInitializeDSM(AppendState *node, ParallelContext *pcxt)
542 : : {
543 : 0 : ParallelAppendState *pstate = node->as_pstate;
544 : :
545 : 0 : pstate->pa_next_plan = 0;
546 : 0 : memset(pstate->pa_finished, 0, sizeof(bool) * node->as_nplans);
547 : 0 : }
548 : :
549 : : /* ----------------------------------------------------------------
550 : : * ExecAppendInitializeWorker
551 : : *
552 : : * Copy relevant information from TOC into planstate, and initialize
553 : : * whatever is required to choose and execute the optimal subplan.
554 : : * ----------------------------------------------------------------
555 : : */
556 : : void
3022 rhaas@postgresql.org 557 :CBC 159 : ExecAppendInitializeWorker(AppendState *node, ParallelWorkerContext *pwcxt)
558 : : {
559 : 159 : node->as_pstate = shm_toc_lookup(pwcxt->toc, node->ps.plan->plan_node_id, false);
560 : 159 : node->choose_next_subplan = choose_next_subplan_for_worker;
561 : 159 : }
562 : :
563 : : /* ----------------------------------------------------------------
564 : : * choose_next_subplan_locally
565 : : *
566 : : * Choose next sync subplan for a non-parallel-aware Append,
567 : : * returning false if there are no more.
568 : : * ----------------------------------------------------------------
569 : : */
570 : : static bool
571 : 53517 : choose_next_subplan_locally(AppendState *node)
572 : : {
573 : 53517 : int whichplan = node->as_whichplan;
574 : : int nextplan;
575 : :
576 : : /* We should never be called when there are no subplans */
2286 tgl@sss.pgh.pa.us 577 [ - + ]: 53517 : Assert(node->as_nplans > 0);
578 : :
579 : : /* Nothing to do if syncdone */
1810 efujita@postgresql.o 580 [ + + ]: 53517 : if (node->as_syncdone)
581 : 18 : return false;
582 : :
583 : : /*
584 : : * If first call then have the bms member function choose the first valid
585 : : * sync subplan by initializing whichplan to -1. If there happen to be no
586 : : * valid sync subplans then the bms member function will handle that by
587 : : * returning a negative number which will allow us to exit returning a
588 : : * false value.
589 : : */
2899 alvherre@alvh.no-ip. 590 [ + + ]: 53499 : if (whichplan == INVALID_SUBPLAN_INDEX)
591 : : {
1810 efujita@postgresql.o 592 [ + + ]: 16348 : if (node->as_nasyncplans > 0)
593 : : {
594 : : /* We'd have filled as_valid_subplans already */
1109 tgl@sss.pgh.pa.us 595 [ - + ]: 19 : Assert(node->as_valid_subplans_identified);
596 : : }
597 [ + + ]: 16329 : else if (!node->as_valid_subplans_identified)
598 : : {
2899 alvherre@alvh.no-ip. 599 : 1693 : node->as_valid_subplans =
401 amitlan@postgresql.o 600 : 1693 : ExecFindMatchingSubPlans(node->as_prune_state, false, NULL);
1109 tgl@sss.pgh.pa.us 601 : 1693 : node->as_valid_subplans_identified = true;
602 : : }
603 : :
2899 alvherre@alvh.no-ip. 604 : 16348 : whichplan = -1;
605 : : }
606 : :
607 : : /* Ensure whichplan is within the expected range */
608 [ + - - + ]: 53499 : Assert(whichplan >= -1 && whichplan <= node->as_nplans);
609 : :
610 [ + + ]: 53499 : if (ScanDirectionIsForward(node->ps.state->es_direction))
611 : 53490 : nextplan = bms_next_member(node->as_valid_subplans, whichplan);
612 : : else
613 : 9 : nextplan = bms_prev_member(node->as_valid_subplans, whichplan);
614 : :
615 [ + + ]: 53499 : if (nextplan < 0)
616 : : {
617 : : /* Set as_syncdone if in async mode */
1810 efujita@postgresql.o 618 [ + + ]: 16051 : if (node->as_nasyncplans > 0)
619 : 17 : node->as_syncdone = true;
2899 alvherre@alvh.no-ip. 620 : 16051 : return false;
621 : : }
622 : :
623 : 37448 : node->as_whichplan = nextplan;
624 : :
3022 rhaas@postgresql.org 625 : 37448 : return true;
626 : : }
627 : :
628 : : /* ----------------------------------------------------------------
629 : : * choose_next_subplan_for_leader
630 : : *
631 : : * Try to pick a plan which doesn't commit us to doing much
632 : : * work locally, so that as much work as possible is done in
633 : : * the workers. Cheapest subplans are at the end.
634 : : * ----------------------------------------------------------------
635 : : */
636 : : static bool
637 : 236 : choose_next_subplan_for_leader(AppendState *node)
638 : : {
639 : 236 : ParallelAppendState *pstate = node->as_pstate;
640 : :
641 : : /* Backward scan is not supported by parallel-aware plans */
642 [ - + ]: 236 : Assert(ScanDirectionIsForward(node->ps.state->es_direction));
643 : :
644 : : /* We should never be called when there are no subplans */
2286 tgl@sss.pgh.pa.us 645 [ - + ]: 236 : Assert(node->as_nplans > 0);
646 : :
3022 rhaas@postgresql.org 647 : 236 : LWLockAcquire(&pstate->pa_lock, LW_EXCLUSIVE);
648 : :
649 [ + + ]: 236 : if (node->as_whichplan != INVALID_SUBPLAN_INDEX)
650 : : {
651 : : /* Mark just-completed subplan as finished. */
652 : 176 : node->as_pstate->pa_finished[node->as_whichplan] = true;
653 : : }
654 : : else
655 : : {
656 : : /* Start with last subplan. */
657 : 60 : node->as_whichplan = node->as_nplans - 1;
658 : :
659 : : /*
660 : : * If we've yet to determine the valid subplans then do so now. If
661 : : * run-time pruning is disabled then the valid subplans will always be
662 : : * set to all subplans.
663 : : */
1109 tgl@sss.pgh.pa.us 664 [ + + ]: 60 : if (!node->as_valid_subplans_identified)
665 : : {
2899 alvherre@alvh.no-ip. 666 : 12 : node->as_valid_subplans =
401 amitlan@postgresql.o 667 : 12 : ExecFindMatchingSubPlans(node->as_prune_state, false, NULL);
1109 tgl@sss.pgh.pa.us 668 : 12 : node->as_valid_subplans_identified = true;
669 : :
670 : : /*
671 : : * Mark each invalid plan as finished to allow the loop below to
672 : : * select the first valid subplan.
673 : : */
2899 alvherre@alvh.no-ip. 674 : 12 : mark_invalid_subplans_as_finished(node);
675 : : }
676 : : }
677 : :
678 : : /* Loop until we find a subplan to execute. */
3022 rhaas@postgresql.org 679 [ + + ]: 386 : while (pstate->pa_finished[node->as_whichplan])
680 : : {
681 [ + + ]: 210 : if (node->as_whichplan == 0)
682 : : {
683 : 60 : pstate->pa_next_plan = INVALID_SUBPLAN_INDEX;
684 : 60 : node->as_whichplan = INVALID_SUBPLAN_INDEX;
685 : 60 : LWLockRelease(&pstate->pa_lock);
686 : 60 : return false;
687 : : }
688 : :
689 : : /*
690 : : * We needn't pay attention to as_valid_subplans here as all invalid
691 : : * plans have been marked as finished.
692 : : */
693 : 150 : node->as_whichplan--;
694 : : }
695 : :
696 : : /* If non-partial, immediately mark as finished. */
2889 alvherre@alvh.no-ip. 697 [ + + ]: 176 : if (node->as_whichplan < node->as_first_partial_plan)
3022 rhaas@postgresql.org 698 : 48 : node->as_pstate->pa_finished[node->as_whichplan] = true;
699 : :
700 : 176 : LWLockRelease(&pstate->pa_lock);
701 : :
702 : 176 : return true;
703 : : }
704 : :
705 : : /* ----------------------------------------------------------------
706 : : * choose_next_subplan_for_worker
707 : : *
708 : : * Choose next subplan for a parallel-aware Append, returning
709 : : * false if there are no more.
710 : : *
711 : : * We start from the first plan and advance through the list;
712 : : * when we get back to the end, we loop back to the first
713 : : * partial plan. This assigns the non-partial plans first in
714 : : * order of descending cost and then spreads out the workers
715 : : * as evenly as possible across the remaining partial plans.
716 : : * ----------------------------------------------------------------
717 : : */
718 : : static bool
719 : 244 : choose_next_subplan_for_worker(AppendState *node)
720 : : {
721 : 244 : ParallelAppendState *pstate = node->as_pstate;
722 : :
723 : : /* Backward scan is not supported by parallel-aware plans */
724 [ - + ]: 244 : Assert(ScanDirectionIsForward(node->ps.state->es_direction));
725 : :
726 : : /* We should never be called when there are no subplans */
2286 tgl@sss.pgh.pa.us 727 [ - + ]: 244 : Assert(node->as_nplans > 0);
728 : :
3022 rhaas@postgresql.org 729 : 244 : LWLockAcquire(&pstate->pa_lock, LW_EXCLUSIVE);
730 : :
731 : : /* Mark just-completed subplan as finished. */
732 [ + + ]: 244 : if (node->as_whichplan != INVALID_SUBPLAN_INDEX)
733 : 103 : node->as_pstate->pa_finished[node->as_whichplan] = true;
734 : :
735 : : /*
736 : : * If we've yet to determine the valid subplans then do so now. If
737 : : * run-time pruning is disabled then the valid subplans will always be set
738 : : * to all subplans.
739 : : */
1109 tgl@sss.pgh.pa.us 740 [ + + ]: 141 : else if (!node->as_valid_subplans_identified)
741 : : {
2899 alvherre@alvh.no-ip. 742 : 12 : node->as_valid_subplans =
401 amitlan@postgresql.o 743 : 12 : ExecFindMatchingSubPlans(node->as_prune_state, false, NULL);
1109 tgl@sss.pgh.pa.us 744 : 12 : node->as_valid_subplans_identified = true;
745 : :
2899 alvherre@alvh.no-ip. 746 : 12 : mark_invalid_subplans_as_finished(node);
747 : : }
748 : :
749 : : /* If all the plans are already done, we have nothing to do */
3022 rhaas@postgresql.org 750 [ + + ]: 244 : if (pstate->pa_next_plan == INVALID_SUBPLAN_INDEX)
751 : : {
752 : 120 : LWLockRelease(&pstate->pa_lock);
753 : 120 : return false;
754 : : }
755 : :
756 : : /* Save the plan from which we are starting the search. */
2957 757 : 124 : node->as_whichplan = pstate->pa_next_plan;
758 : :
759 : : /* Loop until we find a valid subplan to execute. */
3022 760 [ + + ]: 210 : while (pstate->pa_finished[pstate->pa_next_plan])
761 : : {
762 : : int nextplan;
763 : :
2897 alvherre@alvh.no-ip. 764 : 107 : nextplan = bms_next_member(node->as_valid_subplans,
765 : : pstate->pa_next_plan);
766 [ + + ]: 107 : if (nextplan >= 0)
767 : : {
768 : : /* Advance to the next valid plan. */
769 : 73 : pstate->pa_next_plan = nextplan;
770 : : }
2889 771 [ + + ]: 34 : else if (node->as_whichplan > node->as_first_partial_plan)
772 : : {
773 : : /*
774 : : * Try looping back to the first valid partial plan, if there is
775 : : * one. If there isn't, arrange to bail out below.
776 : : */
2897 777 : 29 : nextplan = bms_next_member(node->as_valid_subplans,
2889 778 : 29 : node->as_first_partial_plan - 1);
2897 779 : 29 : pstate->pa_next_plan =
780 [ - + ]: 29 : nextplan < 0 ? node->as_whichplan : nextplan;
781 : : }
782 : : else
783 : : {
784 : : /*
785 : : * At last plan, and either there are no partial plans or we've
786 : : * tried them all. Arrange to bail out.
787 : : */
3022 rhaas@postgresql.org 788 : 5 : pstate->pa_next_plan = node->as_whichplan;
789 : : }
790 : :
791 [ + + ]: 107 : if (pstate->pa_next_plan == node->as_whichplan)
792 : : {
793 : : /* We've tried everything! */
794 : 21 : pstate->pa_next_plan = INVALID_SUBPLAN_INDEX;
795 : 21 : LWLockRelease(&pstate->pa_lock);
796 : 21 : return false;
797 : : }
798 : : }
799 : :
800 : : /* Pick the plan we found, and advance pa_next_plan one more time. */
2897 alvherre@alvh.no-ip. 801 : 103 : node->as_whichplan = pstate->pa_next_plan;
802 : 103 : pstate->pa_next_plan = bms_next_member(node->as_valid_subplans,
803 : : pstate->pa_next_plan);
804 : :
805 : : /*
806 : : * If there are no more valid plans then try setting the next plan to the
807 : : * first valid partial plan.
808 : : */
809 [ + + ]: 103 : if (pstate->pa_next_plan < 0)
810 : : {
811 : 13 : int nextplan = bms_next_member(node->as_valid_subplans,
2889 812 : 13 : node->as_first_partial_plan - 1);
813 : :
2897 814 [ + - ]: 13 : if (nextplan >= 0)
815 : 13 : pstate->pa_next_plan = nextplan;
816 : : else
817 : : {
818 : : /*
819 : : * There are no valid partial plans, and we already chose the last
820 : : * non-partial plan; so flag that there's nothing more for our
821 : : * fellow workers to do.
822 : : */
3021 rhaas@postgresql.org 823 :UBC 0 : pstate->pa_next_plan = INVALID_SUBPLAN_INDEX;
824 : : }
825 : : }
826 : :
827 : : /* If non-partial, immediately mark as finished. */
2889 alvherre@alvh.no-ip. 828 [ + + ]:CBC 103 : if (node->as_whichplan < node->as_first_partial_plan)
3022 rhaas@postgresql.org 829 : 21 : node->as_pstate->pa_finished[node->as_whichplan] = true;
830 : :
831 : 103 : LWLockRelease(&pstate->pa_lock);
832 : :
833 : 103 : return true;
834 : : }
835 : :
836 : : /*
837 : : * mark_invalid_subplans_as_finished
838 : : * Marks the ParallelAppendState's pa_finished as true for each invalid
839 : : * subplan.
840 : : *
841 : : * This function should only be called for parallel Append with run-time
842 : : * pruning enabled.
843 : : */
844 : : static void
2899 alvherre@alvh.no-ip. 845 : 24 : mark_invalid_subplans_as_finished(AppendState *node)
846 : : {
847 : : int i;
848 : :
849 : : /* Only valid to call this while in parallel Append mode */
850 [ - + ]: 24 : Assert(node->as_pstate);
851 : :
852 : : /* Shouldn't have been called when run-time pruning is not enabled */
853 [ - + ]: 24 : Assert(node->as_prune_state);
854 : :
855 : : /* Nothing to do if all plans are valid */
856 [ - + ]: 24 : if (bms_num_members(node->as_valid_subplans) == node->as_nplans)
2899 alvherre@alvh.no-ip. 857 :UBC 0 : return;
858 : :
859 : : /* Mark all non-valid plans as finished */
2899 alvherre@alvh.no-ip. 860 [ + + ]:CBC 81 : for (i = 0; i < node->as_nplans; i++)
861 : : {
862 [ + + ]: 57 : if (!bms_is_member(i, node->as_valid_subplans))
863 : 24 : node->as_pstate->pa_finished[i] = true;
864 : : }
865 : : }
866 : :
867 : : /* ----------------------------------------------------------------
868 : : * Asynchronous Append Support
869 : : * ----------------------------------------------------------------
870 : : */
871 : :
872 : : /* ----------------------------------------------------------------
873 : : * ExecAppendAsyncBegin
874 : : *
875 : : * Begin executing designed async-capable subplans.
876 : : * ----------------------------------------------------------------
877 : : */
878 : : static void
1810 efujita@postgresql.o 879 : 37 : ExecAppendAsyncBegin(AppendState *node)
880 : : {
881 : : int i;
882 : :
883 : : /* Backward scan is not supported by async-aware Appends. */
884 [ - + ]: 37 : Assert(ScanDirectionIsForward(node->ps.state->es_direction));
885 : :
886 : : /* We should never be called when there are no subplans */
1742 887 [ - + ]: 37 : Assert(node->as_nplans > 0);
888 : :
889 : : /* We should never be called when there are no async subplans. */
1810 890 [ - + ]: 37 : Assert(node->as_nasyncplans > 0);
891 : :
892 : : /* If we've yet to determine the valid subplans then do so now. */
1109 tgl@sss.pgh.pa.us 893 [ + + ]: 37 : if (!node->as_valid_subplans_identified)
894 : : {
1810 efujita@postgresql.o 895 : 2 : node->as_valid_subplans =
401 amitlan@postgresql.o 896 : 2 : ExecFindMatchingSubPlans(node->as_prune_state, false, NULL);
1109 tgl@sss.pgh.pa.us 897 : 2 : node->as_valid_subplans_identified = true;
898 : :
1742 efujita@postgresql.o 899 : 2 : classify_matching_subplans(node);
900 : : }
901 : :
902 : : /* Initialize state variables. */
903 : 37 : node->as_syncdone = bms_is_empty(node->as_valid_subplans);
904 : 37 : node->as_nasyncremain = bms_num_members(node->as_valid_asyncplans);
905 : :
906 : : /* Nothing to do if there are no valid async subplans. */
1810 907 [ - + ]: 37 : if (node->as_nasyncremain == 0)
1810 efujita@postgresql.o 908 :UBC 0 : return;
909 : :
910 : : /* Make a request for each of the valid async subplans. */
1810 efujita@postgresql.o 911 :CBC 37 : i = -1;
912 [ + + ]: 109 : while ((i = bms_next_member(node->as_valid_asyncplans, i)) >= 0)
913 : : {
914 : 72 : AsyncRequest *areq = node->as_asyncrequests[i];
915 : :
916 [ - + ]: 72 : Assert(areq->request_index == i);
917 [ - + ]: 72 : Assert(!areq->callback_pending);
918 : :
919 : : /* Do the actual work. */
920 : 72 : ExecAsyncRequest(areq);
921 : : }
922 : : }
923 : :
924 : : /* ----------------------------------------------------------------
925 : : * ExecAppendAsyncGetNext
926 : : *
927 : : * Get the next tuple from any of the asynchronous subplans.
928 : : * ----------------------------------------------------------------
929 : : */
930 : : static bool
931 : 6038 : ExecAppendAsyncGetNext(AppendState *node, TupleTableSlot **result)
932 : : {
933 : 6038 : *result = NULL;
934 : :
935 : : /* We should never be called when there are no valid async subplans. */
936 [ - + ]: 6038 : Assert(node->as_nasyncremain > 0);
937 : :
938 : : /* Request a tuple asynchronously. */
939 [ + + ]: 6038 : if (ExecAppendAsyncRequest(node, result))
940 : 5937 : return true;
941 : :
942 [ + + ]: 153 : while (node->as_nasyncremain > 0)
943 : : {
944 [ - + ]: 122 : CHECK_FOR_INTERRUPTS();
945 : :
946 : : /* Wait or poll for async events. */
947 : 122 : ExecAppendAsyncEventWait(node);
948 : :
949 : : /* Request a tuple asynchronously. */
950 [ + + ]: 121 : if (ExecAppendAsyncRequest(node, result))
951 : 69 : return true;
952 : :
953 : : /* Break from loop if there's any sync subplan that isn't complete. */
954 [ - + ]: 52 : if (!node->as_syncdone)
1810 efujita@postgresql.o 955 :UBC 0 : break;
956 : : }
957 : :
958 : : /*
959 : : * If all sync subplans are complete, we're totally done scanning the
960 : : * given node. Otherwise, we're done with the asynchronous stuff but must
961 : : * continue scanning the sync subplans.
962 : : */
1810 efujita@postgresql.o 963 [ + - ]:CBC 31 : if (node->as_syncdone)
964 : : {
965 [ - + ]: 31 : Assert(node->as_nasyncremain == 0);
966 : 31 : *result = ExecClearTuple(node->ps.ps_ResultTupleSlot);
967 : 31 : return true;
968 : : }
969 : :
1810 efujita@postgresql.o 970 :UBC 0 : return false;
971 : : }
972 : :
973 : : /* ----------------------------------------------------------------
974 : : * ExecAppendAsyncRequest
975 : : *
976 : : * Request a tuple asynchronously.
977 : : * ----------------------------------------------------------------
978 : : */
979 : : static bool
1810 efujita@postgresql.o 980 :CBC 6159 : ExecAppendAsyncRequest(AppendState *node, TupleTableSlot **result)
981 : : {
982 : : Bitmapset *needrequest;
983 : : int i;
984 : :
985 : : /* Nothing to do if there are no async subplans needing a new request. */
986 [ + + ]: 6159 : if (bms_is_empty(node->as_needrequest))
987 : : {
1787 988 [ - + ]: 77 : Assert(node->as_nasyncresults == 0);
1810 989 : 77 : return false;
990 : : }
991 : :
992 : : /*
993 : : * If there are any asynchronously-generated results that have not yet
994 : : * been returned, we have nothing to do; just return one of them.
995 : : */
996 [ + + ]: 6082 : if (node->as_nasyncresults > 0)
997 : : {
998 : 982 : --node->as_nasyncresults;
999 : 982 : *result = node->as_asyncresults[node->as_nasyncresults];
1000 : 982 : return true;
1001 : : }
1002 : :
1003 : : /* Make a new request for each of the async subplans that need it. */
1004 : 5100 : needrequest = node->as_needrequest;
1005 : 5100 : node->as_needrequest = NULL;
1006 : 5100 : i = -1;
1007 [ + + ]: 11103 : while ((i = bms_next_member(needrequest, i)) >= 0)
1008 : : {
1009 : 6003 : AsyncRequest *areq = node->as_asyncrequests[i];
1010 : :
1011 : : /* Do the actual work. */
1012 : 6003 : ExecAsyncRequest(areq);
1013 : : }
1014 : 5100 : bms_free(needrequest);
1015 : :
1016 : : /* Return one of the asynchronously-generated results if any. */
1017 [ + + ]: 5100 : if (node->as_nasyncresults > 0)
1018 : : {
1019 : 5024 : --node->as_nasyncresults;
1020 : 5024 : *result = node->as_asyncresults[node->as_nasyncresults];
1021 : 5024 : return true;
1022 : : }
1023 : :
1024 : 76 : return false;
1025 : : }
1026 : :
1027 : : /* ----------------------------------------------------------------
1028 : : * ExecAppendAsyncEventWait
1029 : : *
1030 : : * Wait or poll for file descriptor events and fire callbacks.
1031 : : * ----------------------------------------------------------------
1032 : : */
1033 : : static void
1034 : 139 : ExecAppendAsyncEventWait(AppendState *node)
1035 : : {
368 heikki.linnakangas@i 1036 : 139 : int nevents = node->as_nasyncplans + 2;
1810 efujita@postgresql.o 1037 [ + + ]: 139 : long timeout = node->as_syncdone ? -1 : 0;
1038 : : WaitEvent occurred_event[EVENT_BUFFER_SIZE];
1039 : : int noccurred;
1040 : : int i;
1041 : :
1042 : : /* We should never be called when there are no valid async subplans. */
1043 [ - + ]: 139 : Assert(node->as_nasyncremain > 0);
1044 : :
843 heikki.linnakangas@i 1045 [ - + ]: 139 : Assert(node->as_eventset == NULL);
1046 : 139 : node->as_eventset = CreateWaitEventSet(CurrentResourceOwner, nevents);
1810 efujita@postgresql.o 1047 : 139 : AddWaitEventToSet(node->as_eventset, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET,
1048 : : NULL, NULL);
1049 : :
1050 : : /* Give each waiting subplan a chance to add an event. */
1051 : 139 : i = -1;
1052 [ + + ]: 424 : while ((i = bms_next_member(node->as_asyncplans, i)) >= 0)
1053 : : {
1054 : 286 : AsyncRequest *areq = node->as_asyncrequests[i];
1055 : :
1056 [ + + ]: 286 : if (areq->callback_pending)
1057 : 238 : ExecAsyncConfigureWait(areq);
1058 : : }
1059 : :
1060 : : /*
1061 : : * No need for further processing if none of the subplans configured any
1062 : : * events.
1063 : : */
1689 1064 [ + + ]: 138 : if (GetNumRegisteredWaitEvents(node->as_eventset) == 1)
1065 : : {
1066 : 1 : FreeWaitEventSet(node->as_eventset);
1067 : 1 : node->as_eventset = NULL;
1068 : 8 : return;
1069 : : }
1070 : :
1071 : : /*
1072 : : * Add the process latch to the set, so that we wake up to process the
1073 : : * standard interrupts with CHECK_FOR_INTERRUPTS().
1074 : : *
1075 : : * NOTE: For historical reasons, it's important that this is added to the
1076 : : * WaitEventSet after the ExecAsyncConfigureWait() calls. Namely,
1077 : : * postgres_fdw calls "GetNumRegisteredWaitEvents(set) == 1" to check if
1078 : : * any other events are in the set. That's a poor design, it's
1079 : : * questionable for postgres_fdw to be doing that in the first place, but
1080 : : * we cannot change it now. The pattern has possibly been copied to other
1081 : : * extensions too.
1082 : : */
368 heikki.linnakangas@i 1083 : 137 : AddWaitEventToSet(node->as_eventset, WL_LATCH_SET, PGINVALID_SOCKET,
1084 : : MyLatch, NULL);
1085 : :
1086 : : /* Return at most EVENT_BUFFER_SIZE events in one call. */
1787 efujita@postgresql.o 1087 [ - + ]: 137 : if (nevents > EVENT_BUFFER_SIZE)
1787 efujita@postgresql.o 1088 :UBC 0 : nevents = EVENT_BUFFER_SIZE;
1089 : :
1090 : : /*
1091 : : * If the timeout is -1, wait until at least one event occurs. If the
1092 : : * timeout is 0, poll for events, but do not wait at all.
1093 : : */
1810 efujita@postgresql.o 1094 :CBC 137 : noccurred = WaitEventSetWait(node->as_eventset, timeout, occurred_event,
1095 : : nevents, WAIT_EVENT_APPEND_READY);
1096 : 137 : FreeWaitEventSet(node->as_eventset);
1097 : 137 : node->as_eventset = NULL;
1098 [ + + ]: 137 : if (noccurred == 0)
1099 : 7 : return;
1100 : :
1101 : : /* Deliver notifications. */
1102 [ + + ]: 278 : for (i = 0; i < noccurred; i++)
1103 : : {
1104 : 148 : WaitEvent *w = &occurred_event[i];
1105 : :
1106 : : /*
1107 : : * Each waiting subplan should have registered its wait event with
1108 : : * user_data pointing back to its AsyncRequest.
1109 : : */
1110 [ + + ]: 148 : if ((w->events & WL_SOCKET_READABLE) != 0)
1111 : : {
1112 : 147 : AsyncRequest *areq = (AsyncRequest *) w->user_data;
1113 : :
1686 1114 [ + - ]: 147 : if (areq->callback_pending)
1115 : : {
1116 : : /*
1117 : : * Mark it as no longer needing a callback. We must do this
1118 : : * before dispatching the callback in case the callback resets
1119 : : * the flag.
1120 : : */
1121 : 147 : areq->callback_pending = false;
1122 : :
1123 : : /* Do the actual work. */
1124 : 147 : ExecAsyncNotify(areq);
1125 : : }
1126 : : }
1127 : :
1128 : : /* Handle standard interrupts */
368 heikki.linnakangas@i 1129 [ + + ]: 148 : if ((w->events & WL_LATCH_SET) != 0)
1130 : : {
368 heikki.linnakangas@i 1131 :GBC 1 : ResetLatch(MyLatch);
1132 [ - + ]: 1 : CHECK_FOR_INTERRUPTS();
1133 : : }
1134 : : }
1135 : : }
1136 : :
1137 : : /* ----------------------------------------------------------------
1138 : : * ExecAsyncAppendResponse
1139 : : *
1140 : : * Receive a response from an asynchronous request we made.
1141 : : * ----------------------------------------------------------------
1142 : : */
1143 : : void
1810 efujita@postgresql.o 1144 :CBC 6227 : ExecAsyncAppendResponse(AsyncRequest *areq)
1145 : : {
1146 : 6227 : AppendState *node = (AppendState *) areq->requestor;
1147 : 6227 : TupleTableSlot *slot = areq->result;
1148 : :
1149 : : /* The result should be a TupleTableSlot or NULL. */
1150 [ + + - + ]: 6227 : Assert(slot == NULL || IsA(slot, TupleTableSlot));
1151 : :
1152 : : /* Nothing to do if the request is pending. */
1153 [ + + ]: 6227 : if (!areq->request_complete)
1154 : : {
1155 : : /* The request would have been pending for a callback. */
1156 [ - + ]: 160 : Assert(areq->callback_pending);
1157 : 160 : return;
1158 : : }
1159 : :
1160 : : /* If the result is NULL or an empty slot, there's nothing more to do. */
1161 [ + + - + ]: 6067 : if (TupIsNull(slot))
1162 : : {
1163 : : /* The ending subplan wouldn't have been pending for a callback. */
1164 [ - + ]: 60 : Assert(!areq->callback_pending);
1165 : 60 : --node->as_nasyncremain;
1166 : 60 : return;
1167 : : }
1168 : :
1169 : : /* Save result so we can return it. */
1170 [ - + ]: 6007 : Assert(node->as_nasyncresults < node->as_nasyncplans);
1171 : 6007 : node->as_asyncresults[node->as_nasyncresults++] = slot;
1172 : :
1173 : : /*
1174 : : * Mark the subplan that returned a result as ready for a new request. We
1175 : : * don't launch another one here immediately because it might complete.
1176 : : */
1177 : 6007 : node->as_needrequest = bms_add_member(node->as_needrequest,
1178 : : areq->request_index);
1179 : : }
1180 : :
1181 : : /* ----------------------------------------------------------------
1182 : : * classify_matching_subplans
1183 : : *
1184 : : * Classify the node's as_valid_subplans into sync ones and
1185 : : * async ones, adjust it to contain sync ones only, and save
1186 : : * async ones in the node's as_valid_asyncplans.
1187 : : * ----------------------------------------------------------------
1188 : : */
1189 : : static void
1190 : 46 : classify_matching_subplans(AppendState *node)
1191 : : {
1192 : : Bitmapset *valid_asyncplans;
1193 : :
1109 tgl@sss.pgh.pa.us 1194 [ - + ]: 46 : Assert(node->as_valid_subplans_identified);
1810 efujita@postgresql.o 1195 [ - + ]: 46 : Assert(node->as_valid_asyncplans == NULL);
1196 : :
1197 : : /* Nothing to do if there are no valid subplans. */
1198 [ - + ]: 46 : if (bms_is_empty(node->as_valid_subplans))
1199 : : {
1810 efujita@postgresql.o 1200 :UBC 0 : node->as_syncdone = true;
1201 : 0 : node->as_nasyncremain = 0;
1202 : 0 : return;
1203 : : }
1204 : :
1205 : : /* Nothing to do if there are no valid async subplans. */
1810 efujita@postgresql.o 1206 [ - + ]:CBC 46 : if (!bms_overlap(node->as_valid_subplans, node->as_asyncplans))
1207 : : {
1810 efujita@postgresql.o 1208 :UBC 0 : node->as_nasyncremain = 0;
1209 : 0 : return;
1210 : : }
1211 : :
1212 : : /* Get valid async subplans. */
1109 tgl@sss.pgh.pa.us 1213 :CBC 46 : valid_asyncplans = bms_intersect(node->as_asyncplans,
1214 : 46 : node->as_valid_subplans);
1215 : :
1216 : : /* Adjust the valid subplans to contain sync subplans only. */
1810 efujita@postgresql.o 1217 : 46 : node->as_valid_subplans = bms_del_members(node->as_valid_subplans,
1218 : : valid_asyncplans);
1219 : :
1220 : : /* Save valid async subplans. */
1221 : 46 : node->as_valid_asyncplans = valid_asyncplans;
1222 : : }
|