Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * nodeGatherMerge.c
4 : : * Scan a plan in multiple workers, and do order-preserving merge.
5 : : *
6 : : * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
7 : : * Portions Copyright (c) 1994, Regents of the University of California
8 : : *
9 : : * IDENTIFICATION
10 : : * src/backend/executor/nodeGatherMerge.c
11 : : *
12 : : *-------------------------------------------------------------------------
13 : : */
14 : :
15 : : #include "postgres.h"
16 : :
17 : : #include "access/htup_details.h"
18 : : #include "executor/executor.h"
19 : : #include "executor/execParallel.h"
20 : : #include "executor/nodeGatherMerge.h"
21 : : #include "executor/tqueue.h"
22 : : #include "lib/binaryheap.h"
23 : : #include "miscadmin.h"
24 : : #include "optimizer/optimizer.h"
25 : :
26 : : /*
27 : : * When we read tuples from workers, it's a good idea to read several at once
28 : : * for efficiency when possible: this minimizes context-switching overhead.
29 : : * But reading too many at a time wastes memory without improving performance.
30 : : * We'll read up to MAX_TUPLE_STORE tuples (in addition to the first one).
31 : : */
32 : : #define MAX_TUPLE_STORE 10
33 : :
34 : : /*
35 : : * Pending-tuple array for each worker. This holds additional tuples that
36 : : * we were able to fetch from the worker, but can't process yet. In addition,
37 : : * this struct holds the "done" flag indicating the worker is known to have
38 : : * no more tuples. (We do not use this struct for the leader; we don't keep
39 : : * any pending tuples for the leader, and the need_to_scan_locally flag serves
40 : : * as its "done" indicator.)
41 : : */
42 : : typedef struct GMReaderTupleBuffer
43 : : {
44 : : MinimalTuple *tuple; /* array of length MAX_TUPLE_STORE */
45 : : int nTuples; /* number of tuples currently stored */
46 : : int readCounter; /* index of next tuple to extract */
47 : : bool done; /* true if reader is known exhausted */
48 : : } GMReaderTupleBuffer;
49 : :
50 : : static TupleTableSlot *ExecGatherMerge(PlanState *pstate);
51 : : static int32 heap_compare_slots(Datum a, Datum b, void *arg);
52 : : static TupleTableSlot *gather_merge_getnext(GatherMergeState *gm_state);
53 : : static MinimalTuple gm_readnext_tuple(GatherMergeState *gm_state, int nreader,
54 : : bool nowait, bool *done);
55 : : static void ExecShutdownGatherMergeWorkers(GatherMergeState *node);
56 : : static void gather_merge_setup(GatherMergeState *gm_state);
57 : : static void gather_merge_init(GatherMergeState *gm_state);
58 : : static void gather_merge_clear_tuples(GatherMergeState *gm_state);
59 : : static bool gather_merge_readnext(GatherMergeState *gm_state, int reader,
60 : : bool nowait);
61 : : static void load_tuple_array(GatherMergeState *gm_state, int reader);
62 : :
63 : : /* ----------------------------------------------------------------
64 : : * ExecInitGather
65 : : * ----------------------------------------------------------------
66 : : */
67 : : GatherMergeState *
3154 rhaas@postgresql.org 68 :CBC 177 : ExecInitGatherMerge(GatherMerge *node, EState *estate, int eflags)
69 : : {
70 : : GatherMergeState *gm_state;
71 : : Plan *outerNode;
72 : : TupleDesc tupDesc;
73 : :
74 : : /* Gather merge node doesn't have innerPlan node. */
75 [ - + ]: 177 : Assert(innerPlan(node) == NULL);
76 : :
77 : : /*
78 : : * create state structure
79 : : */
80 : 177 : gm_state = makeNode(GatherMergeState);
81 : 177 : gm_state->ps.plan = (Plan *) node;
82 : 177 : gm_state->ps.state = estate;
3024 andres@anarazel.de 83 : 177 : gm_state->ps.ExecProcNode = ExecGatherMerge;
84 : :
2980 tgl@sss.pgh.pa.us 85 : 177 : gm_state->initialized = false;
86 : 177 : gm_state->gm_initialized = false;
2981 rhaas@postgresql.org 87 : 177 : gm_state->tuples_needed = -1;
88 : :
89 : : /*
90 : : * Miscellaneous initialization
91 : : *
92 : : * create expression context for node
93 : : */
3154 94 : 177 : ExecAssignExprContext(estate, &gm_state->ps);
95 : :
96 : : /*
97 : : * GatherMerge doesn't support checking a qual (it's always more efficient
98 : : * to do it in the child node).
99 : : */
2980 tgl@sss.pgh.pa.us 100 [ - + ]: 177 : Assert(!node->plan.qual);
101 : :
102 : : /*
103 : : * now initialize outer plan
104 : : */
3154 rhaas@postgresql.org 105 : 177 : outerNode = outerPlan(node);
106 : 177 : outerPlanState(gm_state) = ExecInitNode(outerNode, estate, eflags);
107 : :
108 : : /*
109 : : * Leader may access ExecProcNode result directly (if
110 : : * need_to_scan_locally), or from workers via tuple queue. So we can't
111 : : * trivially rely on the slot type being fixed for expressions evaluated
112 : : * within this node.
113 : : */
2538 andres@anarazel.de 114 : 177 : gm_state->ps.outeropsset = true;
115 : 177 : gm_state->ps.outeropsfixed = false;
116 : :
117 : : /*
118 : : * Store the tuple descriptor into gather merge state, so we can use it
119 : : * while initializing the gather merge slots.
120 : : */
2810 121 : 177 : tupDesc = ExecGetResultType(outerPlanState(gm_state));
2893 rhaas@postgresql.org 122 : 177 : gm_state->tupDesc = tupDesc;
123 : :
124 : : /*
125 : : * Initialize result type and projection.
126 : : */
2544 andres@anarazel.de 127 : 177 : ExecInitResultTypeTL(&gm_state->ps);
2893 rhaas@postgresql.org 128 : 177 : ExecConditionalAssignProjectionInfo(&gm_state->ps, tupDesc, OUTER_VAR);
129 : :
130 : : /*
131 : : * Without projections result slot type is not trivially known, see
132 : : * comment above.
133 : : */
2538 andres@anarazel.de 134 [ + + ]: 177 : if (gm_state->ps.ps_ProjInfo == NULL)
135 : : {
136 : 171 : gm_state->ps.resultopsset = true;
137 : 171 : gm_state->ps.resultopsfixed = false;
138 : : }
139 : :
140 : : /*
141 : : * initialize sort-key information
142 : : */
3154 rhaas@postgresql.org 143 [ + - ]: 177 : if (node->numCols)
144 : : {
145 : : int i;
146 : :
147 : 177 : gm_state->gm_nkeys = node->numCols;
148 : 177 : gm_state->gm_sortkeys =
149 : 177 : palloc0(sizeof(SortSupportData) * node->numCols);
150 : :
151 [ + + ]: 414 : for (i = 0; i < node->numCols; i++)
152 : : {
153 : 237 : SortSupport sortKey = gm_state->gm_sortkeys + i;
154 : :
155 : 237 : sortKey->ssup_cxt = CurrentMemoryContext;
156 : 237 : sortKey->ssup_collation = node->collations[i];
157 : 237 : sortKey->ssup_nulls_first = node->nullsFirst[i];
158 : 237 : sortKey->ssup_attno = node->sortColIdx[i];
159 : :
160 : : /*
161 : : * We don't perform abbreviated key conversion here, for the same
162 : : * reasons that it isn't used in MergeAppend
163 : : */
164 : 237 : sortKey->abbreviate = false;
165 : :
166 : 237 : PrepareSortSupportFromOrderingOp(node->sortOperators[i], sortKey);
167 : : }
168 : : }
169 : :
170 : : /* Now allocate the workspace for gather merge */
2979 tgl@sss.pgh.pa.us 171 : 177 : gather_merge_setup(gm_state);
172 : :
3154 rhaas@postgresql.org 173 : 177 : return gm_state;
174 : : }
175 : :
176 : : /* ----------------------------------------------------------------
177 : : * ExecGatherMerge(node)
178 : : *
179 : : * Scans the relation via multiple workers and returns
180 : : * the next qualifying tuple.
181 : : * ----------------------------------------------------------------
182 : : */
183 : : static TupleTableSlot *
3024 andres@anarazel.de 184 : 127926 : ExecGatherMerge(PlanState *pstate)
185 : : {
186 : 127926 : GatherMergeState *node = castNode(GatherMergeState, pstate);
187 : : TupleTableSlot *slot;
188 : : ExprContext *econtext;
189 : :
3016 190 [ + + ]: 127926 : CHECK_FOR_INTERRUPTS();
191 : :
192 : : /*
193 : : * As with Gather, we don't launch workers until this node is actually
194 : : * executed.
195 : : */
3154 rhaas@postgresql.org 196 [ + + ]: 127926 : if (!node->initialized)
197 : : {
198 : 84 : EState *estate = node->ps.state;
2980 tgl@sss.pgh.pa.us 199 : 84 : GatherMerge *gm = castNode(GatherMerge, node->ps.plan);
200 : :
201 : : /*
202 : : * Sometimes we might have to run without parallelism; but if parallel
203 : : * mode is active then we can try to fire up some workers.
204 : : */
2922 rhaas@postgresql.org 205 [ + - + - ]: 84 : if (gm->num_workers > 0 && estate->es_use_parallel_mode)
206 : : {
207 : : ParallelContext *pcxt;
208 : :
209 : : /* Initialize, or re-initialize, shared state needed by workers. */
3154 210 [ + + ]: 84 : if (!node->pei)
1208 tgl@sss.pgh.pa.us 211 : 69 : node->pei = ExecInitParallelPlan(outerPlanState(node),
212 : : estate,
213 : : gm->initParam,
214 : : gm->num_workers,
215 : : node->tuples_needed);
216 : : else
217 : 15 : ExecParallelReinitialize(outerPlanState(node),
2902 rhaas@postgresql.org 218 : 15 : node->pei,
219 : : gm->initParam);
220 : :
221 : : /* Try to launch workers. */
3154 222 : 84 : pcxt = node->pei->pcxt;
223 : 84 : LaunchParallelWorkers(pcxt);
224 : : /* We save # workers launched for the benefit of EXPLAIN */
225 : 84 : node->nworkers_launched = pcxt->nworkers_launched;
226 : :
227 : : /*
228 : : * Count number of workers originally wanted and actually
229 : : * launched.
230 : : */
383 michael@paquier.xyz 231 : 84 : estate->es_parallel_workers_to_launch += pcxt->nworkers_to_launch;
232 : 84 : estate->es_parallel_workers_launched += pcxt->nworkers_launched;
233 : :
234 : : /* Set up tuple queue readers to read the results. */
3154 rhaas@postgresql.org 235 [ + + ]: 84 : if (pcxt->nworkers_launched > 0)
236 : : {
2965 andres@anarazel.de 237 : 78 : ExecParallelCreateReaders(node->pei);
238 : : /* Make a working array showing the active readers */
2978 tgl@sss.pgh.pa.us 239 : 78 : node->nreaders = pcxt->nworkers_launched;
240 : 78 : node->reader = (TupleQueueReader **)
241 : 78 : palloc(node->nreaders * sizeof(TupleQueueReader *));
242 : 78 : memcpy(node->reader, node->pei->reader,
243 : 78 : node->nreaders * sizeof(TupleQueueReader *));
244 : : }
245 : : else
246 : : {
247 : : /* No workers? Then never mind. */
248 : 6 : node->nreaders = 0;
249 : 6 : node->reader = NULL;
250 : : }
251 : : }
252 : :
253 : : /* allow leader to participate if enabled or no choice */
2903 rhaas@postgresql.org 254 [ + + + + ]: 84 : if (parallel_leader_participation || node->nreaders == 0)
255 : 81 : node->need_to_scan_locally = true;
3154 256 : 84 : node->initialized = true;
257 : : }
258 : :
259 : : /*
260 : : * Reset per-tuple memory context to free any expression evaluation
261 : : * storage allocated in the previous tuple cycle.
262 : : */
263 : 127926 : econtext = node->ps.ps_ExprContext;
264 : 127926 : ResetExprContext(econtext);
265 : :
266 : : /*
267 : : * Get next tuple, either from one of our workers, or by running the plan
268 : : * ourselves.
269 : : */
270 : 127926 : slot = gather_merge_getnext(node);
271 [ + + - + ]: 127926 : if (TupIsNull(slot))
272 : 66 : return NULL;
273 : :
274 : : /* If no projection is required, we're done. */
2893 275 [ + - ]: 127860 : if (node->ps.ps_ProjInfo == NULL)
276 : 127860 : return slot;
277 : :
278 : : /*
279 : : * Form the result tuple using ExecProject(), and return it.
280 : : */
3154 rhaas@postgresql.org 281 :UBC 0 : econtext->ecxt_outertuple = slot;
282 : 0 : return ExecProject(node->ps.ps_ProjInfo);
283 : : }
284 : :
285 : : /* ----------------------------------------------------------------
286 : : * ExecEndGatherMerge
287 : : *
288 : : * frees any storage allocated through C routines.
289 : : * ----------------------------------------------------------------
290 : : */
291 : : void
3154 rhaas@postgresql.org 292 :CBC 177 : ExecEndGatherMerge(GatherMergeState *node)
293 : : {
3085 bruce@momjian.us 294 : 177 : ExecEndNode(outerPlanState(node)); /* let children clean up first */
3154 rhaas@postgresql.org 295 : 177 : ExecShutdownGatherMerge(node);
296 : 177 : }
297 : :
298 : : /* ----------------------------------------------------------------
299 : : * ExecShutdownGatherMerge
300 : : *
301 : : * Destroy the setup for parallel workers including parallel context.
302 : : * ----------------------------------------------------------------
303 : : */
304 : : void
305 : 246 : ExecShutdownGatherMerge(GatherMergeState *node)
306 : : {
307 : 246 : ExecShutdownGatherMergeWorkers(node);
308 : :
309 : : /* Now destroy the parallel context. */
310 [ + + ]: 246 : if (node->pei != NULL)
311 : : {
312 : 69 : ExecParallelCleanup(node->pei);
313 : 69 : node->pei = NULL;
314 : : }
315 : 246 : }
316 : :
317 : : /* ----------------------------------------------------------------
318 : : * ExecShutdownGatherMergeWorkers
319 : : *
320 : : * Stop all the parallel workers.
321 : : * ----------------------------------------------------------------
322 : : */
323 : : static void
324 : 270 : ExecShutdownGatherMergeWorkers(GatherMergeState *node)
325 : : {
326 [ + + ]: 270 : if (node->pei != NULL)
327 : 84 : ExecParallelFinish(node->pei);
328 : :
329 : : /* Flush local copy of reader array */
2978 tgl@sss.pgh.pa.us 330 [ + + ]: 270 : if (node->reader)
331 : 78 : pfree(node->reader);
332 : 270 : node->reader = NULL;
3154 rhaas@postgresql.org 333 : 270 : }
334 : :
335 : : /* ----------------------------------------------------------------
336 : : * ExecReScanGatherMerge
337 : : *
338 : : * Prepare to re-scan the result of a GatherMerge.
339 : : * ----------------------------------------------------------------
340 : : */
341 : : void
342 : 24 : ExecReScanGatherMerge(GatherMergeState *node)
343 : : {
2980 tgl@sss.pgh.pa.us 344 : 24 : GatherMerge *gm = (GatherMerge *) node->ps.plan;
345 : 24 : PlanState *outerPlan = outerPlanState(node);
346 : :
347 : : /* Make sure any existing workers are gracefully shut down */
3154 rhaas@postgresql.org 348 : 24 : ExecShutdownGatherMergeWorkers(node);
349 : :
350 : : /* Free any unused tuples, so we don't leak memory across rescans */
2979 tgl@sss.pgh.pa.us 351 : 24 : gather_merge_clear_tuples(node);
352 : :
353 : : /* Mark node so that shared state will be rebuilt at next call */
3154 rhaas@postgresql.org 354 : 24 : node->initialized = false;
2993 tgl@sss.pgh.pa.us 355 : 24 : node->gm_initialized = false;
356 : :
357 : : /*
358 : : * Set child node's chgParam to tell it that the next scan might deliver a
359 : : * different set of rows within the leader process. (The overall rowset
360 : : * shouldn't change, but the leader process's subset might; hence nodes
361 : : * between here and the parallel table scan node mustn't optimize on the
362 : : * assumption of an unchanging rowset.)
363 : : */
2980 364 [ + - ]: 24 : if (gm->rescan_param >= 0)
365 : 24 : outerPlan->chgParam = bms_add_member(outerPlan->chgParam,
366 : : gm->rescan_param);
367 : :
368 : : /*
369 : : * If chgParam of subnode is not null then plan will be re-scanned by
370 : : * first ExecProcNode. Note: because this does nothing if we have a
371 : : * rescan_param, it's currently guaranteed that parallel-aware child nodes
372 : : * will not see a ReScan call until after they get a ReInitializeDSM call.
373 : : * That ordering might not be something to rely on, though. A good rule
374 : : * of thumb is that ReInitializeDSM should reset only shared state, ReScan
375 : : * should reset only local state, and anything that depends on both of
376 : : * those steps being finished must wait until the first ExecProcNode call.
377 : : */
378 [ - + ]: 24 : if (outerPlan->chgParam == NULL)
2980 tgl@sss.pgh.pa.us 379 :UBC 0 : ExecReScan(outerPlan);
3154 rhaas@postgresql.org 380 :CBC 24 : }
381 : :
382 : : /*
383 : : * Set up the data structures that we'll need for Gather Merge.
384 : : *
385 : : * We allocate these once on the basis of gm->num_workers, which is an
386 : : * upper bound for the number of workers we'll actually have. During
387 : : * a rescan, we reset the structures to empty. This approach simplifies
388 : : * not leaking memory across rescans.
389 : : *
390 : : * In the gm_slots[] array, index 0 is for the leader, and indexes 1 to n
391 : : * are for workers. The values placed into gm_heap correspond to indexes
392 : : * in gm_slots[]. The gm_tuple_buffers[] array, however, is indexed from
393 : : * 0 to n-1; it has no entry for the leader.
394 : : */
395 : : static void
2979 tgl@sss.pgh.pa.us 396 : 177 : gather_merge_setup(GatherMergeState *gm_state)
397 : : {
398 : 177 : GatherMerge *gm = castNode(GatherMerge, gm_state->ps.plan);
399 : 177 : int nreaders = gm->num_workers;
400 : : int i;
401 : :
402 : : /*
403 : : * Allocate gm_slots for the number of workers + one more slot for leader.
404 : : * Slot 0 is always for the leader. Leader always calls ExecProcNode() to
405 : : * read the tuple, and then stores it directly into its gm_slots entry.
406 : : * For other slots, code below will call ExecInitExtraTupleSlot() to
407 : : * create a slot for the worker's results. Note that during any single
408 : : * scan, we might have fewer than num_workers available workers, in which
409 : : * case the extra array entries go unused.
410 : : */
411 : 177 : gm_state->gm_slots = (TupleTableSlot **)
412 : 177 : palloc0((nreaders + 1) * sizeof(TupleTableSlot *));
413 : :
414 : : /* Allocate the tuple slot and tuple array for each worker */
415 : 177 : gm_state->gm_tuple_buffers = (GMReaderTupleBuffer *)
416 : 177 : palloc0(nreaders * sizeof(GMReaderTupleBuffer));
417 : :
418 [ + + ]: 645 : for (i = 0; i < nreaders; i++)
419 : : {
420 : : /* Allocate the tuple array with length MAX_TUPLE_STORE */
3154 rhaas@postgresql.org 421 : 936 : gm_state->gm_tuple_buffers[i].tuple =
1928 tmunro@postgresql.or 422 : 468 : (MinimalTuple *) palloc0(sizeof(MinimalTuple) * MAX_TUPLE_STORE);
423 : :
424 : : /* Initialize tuple slot for worker */
2810 andres@anarazel.de 425 : 468 : gm_state->gm_slots[i + 1] =
2538 426 : 468 : ExecInitExtraTupleSlot(gm_state->ps.state, gm_state->tupDesc,
427 : : &TTSOpsMinimalTuple);
428 : : }
429 : :
430 : : /* Allocate the resources for the merge */
2979 tgl@sss.pgh.pa.us 431 : 177 : gm_state->gm_heap = binaryheap_allocate(nreaders + 1,
432 : : heap_compare_slots,
433 : : gm_state);
434 : 177 : }
435 : :
436 : : /*
437 : : * Initialize the Gather Merge.
438 : : *
439 : : * Reset data structures to ensure they're empty. Then pull at least one
440 : : * tuple from leader + each worker (or set its "done" indicator), and set up
441 : : * the heap.
442 : : */
443 : : static void
444 : 84 : gather_merge_init(GatherMergeState *gm_state)
445 : : {
446 : 84 : int nreaders = gm_state->nreaders;
447 : 84 : bool nowait = true;
448 : : int i;
449 : :
450 : : /* Assert that gather_merge_setup made enough space */
451 [ - + ]: 84 : Assert(nreaders <= castNode(GatherMerge, gm_state->ps.plan)->num_workers);
452 : :
453 : : /* Reset leader's tuple slot to empty */
454 : 84 : gm_state->gm_slots[0] = NULL;
455 : :
456 : : /* Reset the tuple slot and tuple array for each worker */
457 [ + + ]: 297 : for (i = 0; i < nreaders; i++)
458 : : {
459 : : /* Reset tuple array to empty */
460 : 213 : gm_state->gm_tuple_buffers[i].nTuples = 0;
461 : 213 : gm_state->gm_tuple_buffers[i].readCounter = 0;
462 : : /* Reset done flag to not-done */
463 : 213 : gm_state->gm_tuple_buffers[i].done = false;
464 : : /* Ensure output slot is empty */
465 : 213 : ExecClearTuple(gm_state->gm_slots[i + 1]);
466 : : }
467 : :
468 : : /* Reset binary heap to empty */
469 : 84 : binaryheap_reset(gm_state->gm_heap);
470 : :
471 : : /*
472 : : * First, try to read a tuple from each worker (including leader) in
473 : : * nowait mode. After this, if not all workers were able to produce a
474 : : * tuple (or a "done" indication), then re-read from remaining workers,
475 : : * this time using wait mode. Add all live readers (those producing at
476 : : * least one tuple) to the heap.
477 : : */
3154 rhaas@postgresql.org 478 : 149 : reread:
2979 tgl@sss.pgh.pa.us 479 [ + + ]: 689 : for (i = 0; i <= nreaders; i++)
480 : : {
3016 andres@anarazel.de 481 [ - + ]: 540 : CHECK_FOR_INTERRUPTS();
482 : :
483 : : /* skip this source if already known done */
2979 tgl@sss.pgh.pa.us 484 [ + + + + ]: 931 : if ((i == 0) ? gm_state->need_to_scan_locally :
485 : 391 : !gm_state->gm_tuple_buffers[i - 1].done)
486 : : {
2980 487 [ + + + + ]: 517 : if (TupIsNull(gm_state->gm_slots[i]))
488 : : {
489 : : /* Don't have a tuple yet, try to get one */
490 [ + + ]: 646 : if (gather_merge_readnext(gm_state, i, nowait))
491 : 204 : binaryheap_add_unordered(gm_state->gm_heap,
492 : : Int32GetDatum(i));
493 : : }
494 : : else
495 : : {
496 : : /*
497 : : * We already got at least one tuple from this worker, but
498 : : * might as well see if it has any more ready by now.
499 : : */
500 : 75 : load_tuple_array(gm_state, i);
501 : : }
502 : : }
503 : : }
504 : :
505 : : /* need not recheck leader, since nowait doesn't matter for it */
2979 506 [ + + ]: 373 : for (i = 1; i <= nreaders; i++)
507 : : {
508 [ + + ]: 289 : if (!gm_state->gm_tuple_buffers[i - 1].done &&
2980 509 [ + - + + ]: 131 : TupIsNull(gm_state->gm_slots[i]))
510 : : {
511 : 65 : nowait = false;
3154 rhaas@postgresql.org 512 : 65 : goto reread;
513 : : }
514 : : }
515 : :
516 : : /* Now heapify the heap. */
517 : 84 : binaryheap_build(gm_state->gm_heap);
518 : :
519 : 84 : gm_state->gm_initialized = true;
520 : 84 : }
521 : :
522 : : /*
523 : : * Clear out the tuple table slot, and any unused pending tuples,
524 : : * for each gather merge input.
525 : : */
526 : : static void
2979 tgl@sss.pgh.pa.us 527 : 90 : gather_merge_clear_tuples(GatherMergeState *gm_state)
528 : : {
529 : : int i;
530 : :
3154 rhaas@postgresql.org 531 [ + + ]: 333 : for (i = 0; i < gm_state->nreaders; i++)
532 : : {
2979 tgl@sss.pgh.pa.us 533 : 243 : GMReaderTupleBuffer *tuple_buffer = &gm_state->gm_tuple_buffers[i];
534 : :
535 [ - + ]: 243 : while (tuple_buffer->readCounter < tuple_buffer->nTuples)
1928 tmunro@postgresql.or 536 :LBC (7) : pfree(tuple_buffer->tuple[tuple_buffer->readCounter++]);
537 : :
2979 tgl@sss.pgh.pa.us 538 :CBC 243 : ExecClearTuple(gm_state->gm_slots[i + 1]);
539 : : }
3154 rhaas@postgresql.org 540 : 90 : }
541 : :
542 : : /*
543 : : * Read the next tuple for gather merge.
544 : : *
545 : : * Fetch the sorted tuple out of the heap.
546 : : */
547 : : static TupleTableSlot *
548 : 127926 : gather_merge_getnext(GatherMergeState *gm_state)
549 : : {
550 : : int i;
551 : :
3151 tgl@sss.pgh.pa.us 552 [ + + ]: 127926 : if (!gm_state->gm_initialized)
553 : : {
554 : : /*
555 : : * First time through: pull the first tuple from each participant, and
556 : : * set up the heap.
557 : : */
3154 rhaas@postgresql.org 558 : 84 : gather_merge_init(gm_state);
559 : : }
560 : : else
561 : : {
562 : : /*
563 : : * Otherwise, pull the next tuple from whichever participant we
564 : : * returned from last time, and reinsert that participant's index into
565 : : * the heap, because it might now compare differently against the
566 : : * other elements of the heap.
567 : : */
568 : 127842 : i = DatumGetInt32(binaryheap_first(gm_state->gm_heap));
569 : :
570 [ + + ]: 127842 : if (gather_merge_readnext(gm_state, i, false))
571 : 127664 : binaryheap_replace_first(gm_state->gm_heap, Int32GetDatum(i));
572 : : else
573 : : {
574 : : /* reader exhausted, remove it from heap */
575 : 178 : (void) binaryheap_remove_first(gm_state->gm_heap);
576 : : }
577 : : }
578 : :
579 [ + + ]: 127926 : if (binaryheap_empty(gm_state->gm_heap))
580 : : {
581 : : /* All the queues are exhausted, and so is the heap */
2979 tgl@sss.pgh.pa.us 582 : 66 : gather_merge_clear_tuples(gm_state);
3132 rhaas@postgresql.org 583 : 66 : return NULL;
584 : : }
585 : : else
586 : : {
587 : : /* Return next tuple from whichever participant has the leading one */
3154 588 : 127860 : i = DatumGetInt32(binaryheap_first(gm_state->gm_heap));
589 : 127860 : return gm_state->gm_slots[i];
590 : : }
591 : : }
592 : :
593 : : /*
594 : : * Read tuple(s) for given reader in nowait mode, and load into its tuple
595 : : * array, until we have MAX_TUPLE_STORE of them or would have to block.
596 : : */
597 : : static void
2980 tgl@sss.pgh.pa.us 598 : 1482 : load_tuple_array(GatherMergeState *gm_state, int reader)
599 : : {
600 : : GMReaderTupleBuffer *tuple_buffer;
601 : : int i;
602 : :
603 : : /* Don't do anything if this is the leader. */
2979 604 [ + + ]: 1482 : if (reader == 0)
3154 rhaas@postgresql.org 605 : 62 : return;
606 : :
2979 tgl@sss.pgh.pa.us 607 : 1420 : tuple_buffer = &gm_state->gm_tuple_buffers[reader - 1];
608 : :
609 : : /* If there's nothing in the array, reset the counters to zero. */
3154 rhaas@postgresql.org 610 [ + + ]: 1420 : if (tuple_buffer->nTuples == tuple_buffer->readCounter)
611 : 1407 : tuple_buffer->nTuples = tuple_buffer->readCounter = 0;
612 : :
613 : : /* Try to fill additional slots in the array. */
614 [ + + ]: 14826 : for (i = tuple_buffer->nTuples; i < MAX_TUPLE_STORE; i++)
615 : : {
616 : : MinimalTuple tuple;
617 : :
2980 tgl@sss.pgh.pa.us 618 : 13523 : tuple = gm_readnext_tuple(gm_state,
619 : : reader,
620 : : true,
621 : : &tuple_buffer->done);
1928 tmunro@postgresql.or 622 [ + + ]: 13523 : if (!tuple)
3154 rhaas@postgresql.org 623 : 117 : break;
2884 624 : 13406 : tuple_buffer->tuple[i] = tuple;
3154 625 : 13406 : tuple_buffer->nTuples++;
626 : : }
627 : : }
628 : :
629 : : /*
630 : : * Store the next tuple for a given reader into the appropriate slot.
631 : : *
632 : : * Returns true if successful, false if not (either reader is exhausted,
633 : : * or we didn't want to wait for a tuple). Sets done flag if reader
634 : : * is found to be exhausted.
635 : : */
636 : : static bool
637 : 128284 : gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait)
638 : : {
639 : : GMReaderTupleBuffer *tuple_buffer;
640 : : MinimalTuple tup;
641 : :
642 : : /*
643 : : * If we're being asked to generate a tuple from the leader, then we just
644 : : * call ExecProcNode as normal to produce one.
645 : : */
2979 tgl@sss.pgh.pa.us 646 [ + + ]: 128284 : if (reader == 0)
647 : : {
3154 rhaas@postgresql.org 648 [ + - ]: 113143 : if (gm_state->need_to_scan_locally)
649 : : {
650 : 113143 : PlanState *outerPlan = outerPlanState(gm_state);
651 : : TupleTableSlot *outerTupleSlot;
2741 tgl@sss.pgh.pa.us 652 : 113143 : EState *estate = gm_state->ps.state;
653 : :
654 : : /* Install our DSA area while executing the plan. */
2870 rhaas@postgresql.org 655 [ + - ]: 113143 : estate->es_query_dsa = gm_state->pei ? gm_state->pei->area : NULL;
3154 656 : 113143 : outerTupleSlot = ExecProcNode(outerPlan);
2870 657 : 113143 : estate->es_query_dsa = NULL;
658 : :
3154 659 [ + + + + ]: 113143 : if (!TupIsNull(outerTupleSlot))
660 : : {
2979 tgl@sss.pgh.pa.us 661 : 113080 : gm_state->gm_slots[0] = outerTupleSlot;
3154 rhaas@postgresql.org 662 : 113080 : return true;
663 : : }
664 : : /* need_to_scan_locally serves as "done" flag for leader */
665 : 63 : gm_state->need_to_scan_locally = false;
666 : : }
667 : 63 : return false;
668 : : }
669 : :
670 : : /* Otherwise, check the state of the relevant tuple buffer. */
2979 tgl@sss.pgh.pa.us 671 : 15141 : tuple_buffer = &gm_state->gm_tuple_buffers[reader - 1];
672 : :
3154 rhaas@postgresql.org 673 [ + + ]: 15141 : if (tuple_buffer->nTuples > tuple_buffer->readCounter)
674 : : {
675 : : /* Return any tuple previously read that is still buffered. */
676 : 13381 : tup = tuple_buffer->tuple[tuple_buffer->readCounter++];
677 : : }
678 [ + + ]: 1760 : else if (tuple_buffer->done)
679 : : {
680 : : /* Reader is known to be exhausted. */
681 : 110 : return false;
682 : : }
683 : : else
684 : : {
685 : : /* Read and buffer next tuple. */
2980 tgl@sss.pgh.pa.us 686 : 1650 : tup = gm_readnext_tuple(gm_state,
687 : : reader,
688 : : nowait,
689 : : &tuple_buffer->done);
1928 tmunro@postgresql.or 690 [ + + ]: 1650 : if (!tup)
2980 tgl@sss.pgh.pa.us 691 : 243 : return false;
692 : :
693 : : /*
694 : : * Attempt to read more tuples in nowait mode and store them in the
695 : : * pending-tuple array for the reader.
696 : : */
697 : 1407 : load_tuple_array(gm_state, reader);
698 : : }
699 : :
1928 tmunro@postgresql.or 700 [ - + ]: 14788 : Assert(tup);
701 : :
702 : : /* Build the TupleTableSlot for the given tuple */
1629 tgl@sss.pgh.pa.us 703 : 14788 : ExecStoreMinimalTuple(tup, /* tuple to store */
704 : 14788 : gm_state->gm_slots[reader], /* slot in which to
705 : : * store the tuple */
706 : : true); /* pfree tuple when done with it */
707 : :
3154 rhaas@postgresql.org 708 : 14788 : return true;
709 : : }
710 : :
711 : : /*
712 : : * Attempt to read a tuple from given worker.
713 : : */
714 : : static MinimalTuple
715 : 15173 : gm_readnext_tuple(GatherMergeState *gm_state, int nreader, bool nowait,
716 : : bool *done)
717 : : {
718 : : TupleQueueReader *reader;
719 : : MinimalTuple tup;
720 : :
721 : : /* Check for async events, particularly messages from workers. */
722 [ + + ]: 15173 : CHECK_FOR_INTERRUPTS();
723 : :
724 : : /*
725 : : * Attempt to read a tuple.
726 : : *
727 : : * Note that TupleQueueReaderNext will just return NULL for a worker which
728 : : * fails to initialize. We'll treat that worker as having produced no
729 : : * tuples; WaitForParallelWorkersToFinish will error out when we get
730 : : * there.
731 : : */
2979 tgl@sss.pgh.pa.us 732 : 15173 : reader = gm_state->reader[nreader - 1];
3154 rhaas@postgresql.org 733 : 15173 : tup = TupleQueueReaderNext(reader, nowait, done);
734 : :
735 : : /*
736 : : * Since we'll be buffering these across multiple calls, we need to make a
737 : : * copy.
738 : : */
217 jdavis@postgresql.or 739 [ + + ]: 15173 : return tup ? heap_copy_minimal_tuple(tup, 0) : NULL;
740 : : }
741 : :
742 : : /*
743 : : * We have one slot for each item in the heap array. We use SlotNumber
744 : : * to store slot indexes. This doesn't actually provide any formal
745 : : * type-safety, but it makes the code more self-documenting.
746 : : */
747 : : typedef int32 SlotNumber;
748 : :
749 : : /*
750 : : * Compare the tuples in the two given slots.
751 : : */
752 : : static int32
3154 rhaas@postgresql.org 753 : 149025 : heap_compare_slots(Datum a, Datum b, void *arg)
754 : : {
755 : 149025 : GatherMergeState *node = (GatherMergeState *) arg;
756 : 149025 : SlotNumber slot1 = DatumGetInt32(a);
757 : 149025 : SlotNumber slot2 = DatumGetInt32(b);
758 : :
759 : 149025 : TupleTableSlot *s1 = node->gm_slots[slot1];
760 : 149025 : TupleTableSlot *s2 = node->gm_slots[slot2];
761 : : int nkey;
762 : :
763 [ + - - + ]: 149025 : Assert(!TupIsNull(s1));
764 [ + - - + ]: 149025 : Assert(!TupIsNull(s2));
765 : :
766 [ + + ]: 221677 : for (nkey = 0; nkey < node->gm_nkeys; nkey++)
767 : : {
768 : 149025 : SortSupport sortKey = node->gm_sortkeys + nkey;
769 : 149025 : AttrNumber attno = sortKey->ssup_attno;
770 : : Datum datum1,
771 : : datum2;
772 : : bool isNull1,
773 : : isNull2;
774 : : int compare;
775 : :
776 : 149025 : datum1 = slot_getattr(s1, attno, &isNull1);
777 : 149025 : datum2 = slot_getattr(s2, attno, &isNull2);
778 : :
779 : 149025 : compare = ApplySortComparator(datum1, isNull1,
780 : : datum2, isNull2,
781 : : sortKey);
782 [ + + ]: 149025 : if (compare != 0)
783 : : {
2579 tgl@sss.pgh.pa.us 784 [ + + ]: 76373 : INVERT_COMPARE_RESULT(compare);
785 : 76373 : return compare;
786 : : }
787 : : }
3154 rhaas@postgresql.org 788 : 72652 : return 0;
789 : : }
|