Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * nodeGatherMerge.c
4 : : * Scan a plan in multiple workers, and do order-preserving merge.
5 : : *
6 : : * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
7 : : * Portions Copyright (c) 1994, Regents of the University of California
8 : : *
9 : : * IDENTIFICATION
10 : : * src/backend/executor/nodeGatherMerge.c
11 : : *
12 : : *-------------------------------------------------------------------------
13 : : */
14 : :
15 : : #include "postgres.h"
16 : :
17 : : #include "access/htup_details.h"
18 : : #include "executor/executor.h"
19 : : #include "executor/execParallel.h"
20 : : #include "executor/nodeGatherMerge.h"
21 : : #include "executor/tqueue.h"
22 : : #include "lib/binaryheap.h"
23 : : #include "miscadmin.h"
24 : : #include "optimizer/optimizer.h"
25 : :
26 : : /*
27 : : * When we read tuples from workers, it's a good idea to read several at once
28 : : * for efficiency when possible: this minimizes context-switching overhead.
29 : : * But reading too many at a time wastes memory without improving performance.
30 : : * We'll read up to MAX_TUPLE_STORE tuples (in addition to the first one).
31 : : */
32 : : #define MAX_TUPLE_STORE 10
33 : :
34 : : /*
35 : : * Pending-tuple array for each worker. This holds additional tuples that
36 : : * we were able to fetch from the worker, but can't process yet. In addition,
37 : : * this struct holds the "done" flag indicating the worker is known to have
38 : : * no more tuples. (We do not use this struct for the leader; we don't keep
39 : : * any pending tuples for the leader, and the need_to_scan_locally flag serves
40 : : * as its "done" indicator.)
41 : : */
42 : : typedef struct GMReaderTupleBuffer
43 : : {
44 : : MinimalTuple *tuple; /* array of length MAX_TUPLE_STORE */
45 : : int nTuples; /* number of tuples currently stored */
46 : : int readCounter; /* index of next tuple to extract */
47 : : bool done; /* true if reader is known exhausted */
48 : : } GMReaderTupleBuffer;
49 : :
50 : : static TupleTableSlot *ExecGatherMerge(PlanState *pstate);
51 : : static int32 heap_compare_slots(Datum a, Datum b, void *arg);
52 : : static TupleTableSlot *gather_merge_getnext(GatherMergeState *gm_state);
53 : : static MinimalTuple gm_readnext_tuple(GatherMergeState *gm_state, int nreader,
54 : : bool nowait, bool *done);
55 : : static void ExecShutdownGatherMergeWorkers(GatherMergeState *node);
56 : : static void gather_merge_setup(GatherMergeState *gm_state);
57 : : static void gather_merge_init(GatherMergeState *gm_state);
58 : : static void gather_merge_clear_tuples(GatherMergeState *gm_state);
59 : : static bool gather_merge_readnext(GatherMergeState *gm_state, int reader,
60 : : bool nowait);
61 : : static void load_tuple_array(GatherMergeState *gm_state, int reader);
62 : :
63 : : /* ----------------------------------------------------------------
64 : : * ExecInitGather
65 : : * ----------------------------------------------------------------
66 : : */
67 : : GatherMergeState *
3204 rhaas@postgresql.org 68 :CBC 177 : ExecInitGatherMerge(GatherMerge *node, EState *estate, int eflags)
69 : : {
70 : : GatherMergeState *gm_state;
71 : : Plan *outerNode;
72 : : TupleDesc tupDesc;
73 : :
74 : : /* Gather merge node doesn't have innerPlan node. */
75 [ - + ]: 177 : Assert(innerPlan(node) == NULL);
76 : :
77 : : /*
78 : : * create state structure
79 : : */
80 : 177 : gm_state = makeNode(GatherMergeState);
81 : 177 : gm_state->ps.plan = (Plan *) node;
82 : 177 : gm_state->ps.state = estate;
3074 andres@anarazel.de 83 : 177 : gm_state->ps.ExecProcNode = ExecGatherMerge;
84 : :
3030 tgl@sss.pgh.pa.us 85 : 177 : gm_state->initialized = false;
86 : 177 : gm_state->gm_initialized = false;
3031 rhaas@postgresql.org 87 : 177 : gm_state->tuples_needed = -1;
88 : :
89 : : /*
90 : : * Miscellaneous initialization
91 : : *
92 : : * create expression context for node
93 : : */
3204 94 : 177 : ExecAssignExprContext(estate, &gm_state->ps);
95 : :
96 : : /*
97 : : * GatherMerge doesn't support checking a qual (it's always more efficient
98 : : * to do it in the child node).
99 : : */
3030 tgl@sss.pgh.pa.us 100 [ - + ]: 177 : Assert(!node->plan.qual);
101 : :
102 : : /*
103 : : * now initialize outer plan
104 : : */
3204 rhaas@postgresql.org 105 : 177 : outerNode = outerPlan(node);
106 : 177 : outerPlanState(gm_state) = ExecInitNode(outerNode, estate, eflags);
107 : :
108 : : /*
109 : : * Leader may access ExecProcNode result directly (if
110 : : * need_to_scan_locally), or from workers via tuple queue. So we can't
111 : : * trivially rely on the slot type being fixed for expressions evaluated
112 : : * within this node.
113 : : */
2588 andres@anarazel.de 114 : 177 : gm_state->ps.outeropsset = true;
115 : 177 : gm_state->ps.outeropsfixed = false;
116 : :
117 : : /*
118 : : * Store the tuple descriptor into gather merge state, so we can use it
119 : : * while initializing the gather merge slots.
120 : : */
2860 121 : 177 : tupDesc = ExecGetResultType(outerPlanState(gm_state));
2943 rhaas@postgresql.org 122 : 177 : gm_state->tupDesc = tupDesc;
123 : :
124 : : /*
125 : : * Initialize result type and projection.
126 : : */
2594 andres@anarazel.de 127 : 177 : ExecInitResultTypeTL(&gm_state->ps);
2943 rhaas@postgresql.org 128 : 177 : ExecConditionalAssignProjectionInfo(&gm_state->ps, tupDesc, OUTER_VAR);
129 : :
130 : : /*
131 : : * Without projections result slot type is not trivially known, see
132 : : * comment above.
133 : : */
2588 andres@anarazel.de 134 [ + + ]: 177 : if (gm_state->ps.ps_ProjInfo == NULL)
135 : : {
136 : 171 : gm_state->ps.resultopsset = true;
137 : 171 : gm_state->ps.resultopsfixed = false;
138 : : }
139 : :
140 : : /*
141 : : * initialize sort-key information
142 : : */
3204 rhaas@postgresql.org 143 [ + - ]: 177 : if (node->numCols)
144 : : {
145 : : int i;
146 : :
147 : 177 : gm_state->gm_nkeys = node->numCols;
6 michael@paquier.xyz 148 :GNC 177 : gm_state->gm_sortkeys = palloc0_array(SortSupportData, node->numCols);
149 : :
3204 rhaas@postgresql.org 150 [ + + ]:CBC 414 : for (i = 0; i < node->numCols; i++)
151 : : {
152 : 237 : SortSupport sortKey = gm_state->gm_sortkeys + i;
153 : :
154 : 237 : sortKey->ssup_cxt = CurrentMemoryContext;
155 : 237 : sortKey->ssup_collation = node->collations[i];
156 : 237 : sortKey->ssup_nulls_first = node->nullsFirst[i];
157 : 237 : sortKey->ssup_attno = node->sortColIdx[i];
158 : :
159 : : /*
160 : : * We don't perform abbreviated key conversion here, for the same
161 : : * reasons that it isn't used in MergeAppend
162 : : */
163 : 237 : sortKey->abbreviate = false;
164 : :
165 : 237 : PrepareSortSupportFromOrderingOp(node->sortOperators[i], sortKey);
166 : : }
167 : : }
168 : :
169 : : /* Now allocate the workspace for gather merge */
3029 tgl@sss.pgh.pa.us 170 : 177 : gather_merge_setup(gm_state);
171 : :
3204 rhaas@postgresql.org 172 : 177 : return gm_state;
173 : : }
174 : :
175 : : /* ----------------------------------------------------------------
176 : : * ExecGatherMerge(node)
177 : : *
178 : : * Scans the relation via multiple workers and returns
179 : : * the next qualifying tuple.
180 : : * ----------------------------------------------------------------
181 : : */
182 : : static TupleTableSlot *
3074 andres@anarazel.de 183 : 128061 : ExecGatherMerge(PlanState *pstate)
184 : : {
185 : 128061 : GatherMergeState *node = castNode(GatherMergeState, pstate);
186 : : TupleTableSlot *slot;
187 : : ExprContext *econtext;
188 : :
3066 189 [ + + ]: 128061 : CHECK_FOR_INTERRUPTS();
190 : :
191 : : /*
192 : : * As with Gather, we don't launch workers until this node is actually
193 : : * executed.
194 : : */
3204 rhaas@postgresql.org 195 [ + + ]: 128061 : if (!node->initialized)
196 : : {
197 : 84 : EState *estate = node->ps.state;
3030 tgl@sss.pgh.pa.us 198 : 84 : GatherMerge *gm = castNode(GatherMerge, node->ps.plan);
199 : :
200 : : /*
201 : : * Sometimes we might have to run without parallelism; but if parallel
202 : : * mode is active then we can try to fire up some workers.
203 : : */
2972 rhaas@postgresql.org 204 [ + - + - ]: 84 : if (gm->num_workers > 0 && estate->es_use_parallel_mode)
205 : : {
206 : : ParallelContext *pcxt;
207 : :
208 : : /* Initialize, or re-initialize, shared state needed by workers. */
3204 209 [ + + ]: 84 : if (!node->pei)
1258 tgl@sss.pgh.pa.us 210 : 69 : node->pei = ExecInitParallelPlan(outerPlanState(node),
211 : : estate,
212 : : gm->initParam,
213 : : gm->num_workers,
214 : : node->tuples_needed);
215 : : else
216 : 15 : ExecParallelReinitialize(outerPlanState(node),
2952 rhaas@postgresql.org 217 : 15 : node->pei,
218 : : gm->initParam);
219 : :
220 : : /* Try to launch workers. */
3204 221 : 84 : pcxt = node->pei->pcxt;
222 : 84 : LaunchParallelWorkers(pcxt);
223 : : /* We save # workers launched for the benefit of EXPLAIN */
224 : 84 : node->nworkers_launched = pcxt->nworkers_launched;
225 : :
226 : : /*
227 : : * Count number of workers originally wanted and actually
228 : : * launched.
229 : : */
433 michael@paquier.xyz 230 : 84 : estate->es_parallel_workers_to_launch += pcxt->nworkers_to_launch;
231 : 84 : estate->es_parallel_workers_launched += pcxt->nworkers_launched;
232 : :
233 : : /* Set up tuple queue readers to read the results. */
3204 rhaas@postgresql.org 234 [ + + ]: 84 : if (pcxt->nworkers_launched > 0)
235 : : {
3015 andres@anarazel.de 236 : 78 : ExecParallelCreateReaders(node->pei);
237 : : /* Make a working array showing the active readers */
3028 tgl@sss.pgh.pa.us 238 : 78 : node->nreaders = pcxt->nworkers_launched;
239 : 78 : node->reader = (TupleQueueReader **)
240 : 78 : palloc(node->nreaders * sizeof(TupleQueueReader *));
241 : 78 : memcpy(node->reader, node->pei->reader,
242 : 78 : node->nreaders * sizeof(TupleQueueReader *));
243 : : }
244 : : else
245 : : {
246 : : /* No workers? Then never mind. */
247 : 6 : node->nreaders = 0;
248 : 6 : node->reader = NULL;
249 : : }
250 : : }
251 : :
252 : : /* allow leader to participate if enabled or no choice */
2953 rhaas@postgresql.org 253 [ + + + + ]: 84 : if (parallel_leader_participation || node->nreaders == 0)
254 : 81 : node->need_to_scan_locally = true;
3204 255 : 84 : node->initialized = true;
256 : : }
257 : :
258 : : /*
259 : : * Reset per-tuple memory context to free any expression evaluation
260 : : * storage allocated in the previous tuple cycle.
261 : : */
262 : 128061 : econtext = node->ps.ps_ExprContext;
263 : 128061 : ResetExprContext(econtext);
264 : :
265 : : /*
266 : : * Get next tuple, either from one of our workers, or by running the plan
267 : : * ourselves.
268 : : */
269 : 128061 : slot = gather_merge_getnext(node);
270 [ + + - + ]: 128061 : if (TupIsNull(slot))
271 : 66 : return NULL;
272 : :
273 : : /* If no projection is required, we're done. */
2943 274 [ + - ]: 127995 : if (node->ps.ps_ProjInfo == NULL)
275 : 127995 : return slot;
276 : :
277 : : /*
278 : : * Form the result tuple using ExecProject(), and return it.
279 : : */
3204 rhaas@postgresql.org 280 :UBC 0 : econtext->ecxt_outertuple = slot;
281 : 0 : return ExecProject(node->ps.ps_ProjInfo);
282 : : }
283 : :
284 : : /* ----------------------------------------------------------------
285 : : * ExecEndGatherMerge
286 : : *
287 : : * frees any storage allocated through C routines.
288 : : * ----------------------------------------------------------------
289 : : */
290 : : void
3204 rhaas@postgresql.org 291 :CBC 177 : ExecEndGatherMerge(GatherMergeState *node)
292 : : {
3135 bruce@momjian.us 293 : 177 : ExecEndNode(outerPlanState(node)); /* let children clean up first */
3204 rhaas@postgresql.org 294 : 177 : ExecShutdownGatherMerge(node);
295 : 177 : }
296 : :
297 : : /* ----------------------------------------------------------------
298 : : * ExecShutdownGatherMerge
299 : : *
300 : : * Destroy the setup for parallel workers including parallel context.
301 : : * ----------------------------------------------------------------
302 : : */
303 : : void
304 : 246 : ExecShutdownGatherMerge(GatherMergeState *node)
305 : : {
306 : 246 : ExecShutdownGatherMergeWorkers(node);
307 : :
308 : : /* Now destroy the parallel context. */
309 [ + + ]: 246 : if (node->pei != NULL)
310 : : {
311 : 69 : ExecParallelCleanup(node->pei);
312 : 69 : node->pei = NULL;
313 : : }
314 : 246 : }
315 : :
316 : : /* ----------------------------------------------------------------
317 : : * ExecShutdownGatherMergeWorkers
318 : : *
319 : : * Stop all the parallel workers.
320 : : * ----------------------------------------------------------------
321 : : */
322 : : static void
323 : 270 : ExecShutdownGatherMergeWorkers(GatherMergeState *node)
324 : : {
325 [ + + ]: 270 : if (node->pei != NULL)
326 : 84 : ExecParallelFinish(node->pei);
327 : :
328 : : /* Flush local copy of reader array */
3028 tgl@sss.pgh.pa.us 329 [ + + ]: 270 : if (node->reader)
330 : 78 : pfree(node->reader);
331 : 270 : node->reader = NULL;
3204 rhaas@postgresql.org 332 : 270 : }
333 : :
334 : : /* ----------------------------------------------------------------
335 : : * ExecReScanGatherMerge
336 : : *
337 : : * Prepare to re-scan the result of a GatherMerge.
338 : : * ----------------------------------------------------------------
339 : : */
340 : : void
341 : 24 : ExecReScanGatherMerge(GatherMergeState *node)
342 : : {
3030 tgl@sss.pgh.pa.us 343 : 24 : GatherMerge *gm = (GatherMerge *) node->ps.plan;
344 : 24 : PlanState *outerPlan = outerPlanState(node);
345 : :
346 : : /* Make sure any existing workers are gracefully shut down */
3204 rhaas@postgresql.org 347 : 24 : ExecShutdownGatherMergeWorkers(node);
348 : :
349 : : /* Free any unused tuples, so we don't leak memory across rescans */
3029 tgl@sss.pgh.pa.us 350 : 24 : gather_merge_clear_tuples(node);
351 : :
352 : : /* Mark node so that shared state will be rebuilt at next call */
3204 rhaas@postgresql.org 353 : 24 : node->initialized = false;
3043 tgl@sss.pgh.pa.us 354 : 24 : node->gm_initialized = false;
355 : :
356 : : /*
357 : : * Set child node's chgParam to tell it that the next scan might deliver a
358 : : * different set of rows within the leader process. (The overall rowset
359 : : * shouldn't change, but the leader process's subset might; hence nodes
360 : : * between here and the parallel table scan node mustn't optimize on the
361 : : * assumption of an unchanging rowset.)
362 : : */
3030 363 [ + - ]: 24 : if (gm->rescan_param >= 0)
364 : 24 : outerPlan->chgParam = bms_add_member(outerPlan->chgParam,
365 : : gm->rescan_param);
366 : :
367 : : /*
368 : : * If chgParam of subnode is not null then plan will be re-scanned by
369 : : * first ExecProcNode. Note: because this does nothing if we have a
370 : : * rescan_param, it's currently guaranteed that parallel-aware child nodes
371 : : * will not see a ReScan call until after they get a ReInitializeDSM call.
372 : : * That ordering might not be something to rely on, though. A good rule
373 : : * of thumb is that ReInitializeDSM should reset only shared state, ReScan
374 : : * should reset only local state, and anything that depends on both of
375 : : * those steps being finished must wait until the first ExecProcNode call.
376 : : */
377 [ - + ]: 24 : if (outerPlan->chgParam == NULL)
3030 tgl@sss.pgh.pa.us 378 :UBC 0 : ExecReScan(outerPlan);
3204 rhaas@postgresql.org 379 :CBC 24 : }
380 : :
381 : : /*
382 : : * Set up the data structures that we'll need for Gather Merge.
383 : : *
384 : : * We allocate these once on the basis of gm->num_workers, which is an
385 : : * upper bound for the number of workers we'll actually have. During
386 : : * a rescan, we reset the structures to empty. This approach simplifies
387 : : * not leaking memory across rescans.
388 : : *
389 : : * In the gm_slots[] array, index 0 is for the leader, and indexes 1 to n
390 : : * are for workers. The values placed into gm_heap correspond to indexes
391 : : * in gm_slots[]. The gm_tuple_buffers[] array, however, is indexed from
392 : : * 0 to n-1; it has no entry for the leader.
393 : : */
394 : : static void
3029 tgl@sss.pgh.pa.us 395 : 177 : gather_merge_setup(GatherMergeState *gm_state)
396 : : {
397 : 177 : GatherMerge *gm = castNode(GatherMerge, gm_state->ps.plan);
398 : 177 : int nreaders = gm->num_workers;
399 : : int i;
400 : :
401 : : /*
402 : : * Allocate gm_slots for the number of workers + one more slot for leader.
403 : : * Slot 0 is always for the leader. Leader always calls ExecProcNode() to
404 : : * read the tuple, and then stores it directly into its gm_slots entry.
405 : : * For other slots, code below will call ExecInitExtraTupleSlot() to
406 : : * create a slot for the worker's results. Note that during any single
407 : : * scan, we might have fewer than num_workers available workers, in which
408 : : * case the extra array entries go unused.
409 : : */
410 : 177 : gm_state->gm_slots = (TupleTableSlot **)
411 : 177 : palloc0((nreaders + 1) * sizeof(TupleTableSlot *));
412 : :
413 : : /* Allocate the tuple slot and tuple array for each worker */
414 : 177 : gm_state->gm_tuple_buffers = (GMReaderTupleBuffer *)
415 : 177 : palloc0(nreaders * sizeof(GMReaderTupleBuffer));
416 : :
417 [ + + ]: 645 : for (i = 0; i < nreaders; i++)
418 : : {
419 : : /* Allocate the tuple array with length MAX_TUPLE_STORE */
6 michael@paquier.xyz 420 :GNC 468 : gm_state->gm_tuple_buffers[i].tuple = palloc0_array(MinimalTuple, MAX_TUPLE_STORE);
421 : :
422 : : /* Initialize tuple slot for worker */
2860 andres@anarazel.de 423 :CBC 468 : gm_state->gm_slots[i + 1] =
2588 424 : 468 : ExecInitExtraTupleSlot(gm_state->ps.state, gm_state->tupDesc,
425 : : &TTSOpsMinimalTuple);
426 : : }
427 : :
428 : : /* Allocate the resources for the merge */
3029 tgl@sss.pgh.pa.us 429 : 177 : gm_state->gm_heap = binaryheap_allocate(nreaders + 1,
430 : : heap_compare_slots,
431 : : gm_state);
432 : 177 : }
433 : :
434 : : /*
435 : : * Initialize the Gather Merge.
436 : : *
437 : : * Reset data structures to ensure they're empty. Then pull at least one
438 : : * tuple from leader + each worker (or set its "done" indicator), and set up
439 : : * the heap.
440 : : */
441 : : static void
442 : 84 : gather_merge_init(GatherMergeState *gm_state)
443 : : {
444 : 84 : int nreaders = gm_state->nreaders;
445 : 84 : bool nowait = true;
446 : : int i;
447 : :
448 : : /* Assert that gather_merge_setup made enough space */
449 [ - + ]: 84 : Assert(nreaders <= castNode(GatherMerge, gm_state->ps.plan)->num_workers);
450 : :
451 : : /* Reset leader's tuple slot to empty */
452 : 84 : gm_state->gm_slots[0] = NULL;
453 : :
454 : : /* Reset the tuple slot and tuple array for each worker */
455 [ + + ]: 297 : for (i = 0; i < nreaders; i++)
456 : : {
457 : : /* Reset tuple array to empty */
458 : 213 : gm_state->gm_tuple_buffers[i].nTuples = 0;
459 : 213 : gm_state->gm_tuple_buffers[i].readCounter = 0;
460 : : /* Reset done flag to not-done */
461 : 213 : gm_state->gm_tuple_buffers[i].done = false;
462 : : /* Ensure output slot is empty */
463 : 213 : ExecClearTuple(gm_state->gm_slots[i + 1]);
464 : : }
465 : :
466 : : /* Reset binary heap to empty */
467 : 84 : binaryheap_reset(gm_state->gm_heap);
468 : :
469 : : /*
470 : : * First, try to read a tuple from each worker (including leader) in
471 : : * nowait mode. After this, if not all workers were able to produce a
472 : : * tuple (or a "done" indication), then re-read from remaining workers,
473 : : * this time using wait mode. Add all live readers (those producing at
474 : : * least one tuple) to the heap.
475 : : */
3204 rhaas@postgresql.org 476 : 148 : reread:
3029 tgl@sss.pgh.pa.us 477 [ + + ]: 677 : for (i = 0; i <= nreaders; i++)
478 : : {
3066 andres@anarazel.de 479 [ + + ]: 529 : CHECK_FOR_INTERRUPTS();
480 : :
481 : : /* skip this source if already known done */
3029 tgl@sss.pgh.pa.us 482 [ + + + + ]: 910 : if ((i == 0) ? gm_state->need_to_scan_locally :
483 : 381 : !gm_state->gm_tuple_buffers[i - 1].done)
484 : : {
3030 485 [ + + + + ]: 503 : if (TupIsNull(gm_state->gm_slots[i]))
486 : : {
487 : : /* Don't have a tuple yet, try to get one */
488 [ + + ]: 630 : if (gather_merge_readnext(gm_state, i, nowait))
489 : 208 : binaryheap_add_unordered(gm_state->gm_heap,
490 : : Int32GetDatum(i));
491 : : }
492 : : else
493 : : {
494 : : /*
495 : : * We already got at least one tuple from this worker, but
496 : : * might as well see if it has any more ready by now.
497 : : */
498 : 81 : load_tuple_array(gm_state, i);
499 : : }
500 : : }
501 : : }
502 : :
503 : : /* need not recheck leader, since nowait doesn't matter for it */
3029 504 [ + + ]: 378 : for (i = 1; i <= nreaders; i++)
505 : : {
506 [ + + ]: 294 : if (!gm_state->gm_tuple_buffers[i - 1].done &&
3030 507 [ + - + + ]: 145 : TupIsNull(gm_state->gm_slots[i]))
508 : : {
509 : 64 : nowait = false;
3204 rhaas@postgresql.org 510 : 64 : goto reread;
511 : : }
512 : : }
513 : :
514 : : /* Now heapify the heap. */
515 : 84 : binaryheap_build(gm_state->gm_heap);
516 : :
517 : 84 : gm_state->gm_initialized = true;
518 : 84 : }
519 : :
520 : : /*
521 : : * Clear out the tuple table slot, and any unused pending tuples,
522 : : * for each gather merge input.
523 : : */
524 : : static void
3029 tgl@sss.pgh.pa.us 525 : 90 : gather_merge_clear_tuples(GatherMergeState *gm_state)
526 : : {
527 : : int i;
528 : :
3204 rhaas@postgresql.org 529 [ + + ]: 333 : for (i = 0; i < gm_state->nreaders; i++)
530 : : {
3029 tgl@sss.pgh.pa.us 531 : 243 : GMReaderTupleBuffer *tuple_buffer = &gm_state->gm_tuple_buffers[i];
532 : :
533 [ + + ]: 250 : while (tuple_buffer->readCounter < tuple_buffer->nTuples)
1978 tmunro@postgresql.or 534 : 7 : pfree(tuple_buffer->tuple[tuple_buffer->readCounter++]);
535 : :
3029 tgl@sss.pgh.pa.us 536 : 243 : ExecClearTuple(gm_state->gm_slots[i + 1]);
537 : : }
3204 rhaas@postgresql.org 538 : 90 : }
539 : :
540 : : /*
541 : : * Read the next tuple for gather merge.
542 : : *
543 : : * Fetch the sorted tuple out of the heap.
544 : : */
545 : : static TupleTableSlot *
546 : 128061 : gather_merge_getnext(GatherMergeState *gm_state)
547 : : {
548 : : int i;
549 : :
3201 tgl@sss.pgh.pa.us 550 [ + + ]: 128061 : if (!gm_state->gm_initialized)
551 : : {
552 : : /*
553 : : * First time through: pull the first tuple from each participant, and
554 : : * set up the heap.
555 : : */
3204 rhaas@postgresql.org 556 : 84 : gather_merge_init(gm_state);
557 : : }
558 : : else
559 : : {
560 : : /*
561 : : * Otherwise, pull the next tuple from whichever participant we
562 : : * returned from last time, and reinsert that participant's index into
563 : : * the heap, because it might now compare differently against the
564 : : * other elements of the heap.
565 : : */
566 : 127977 : i = DatumGetInt32(binaryheap_first(gm_state->gm_heap));
567 : :
568 [ + + ]: 127977 : if (gather_merge_readnext(gm_state, i, false))
569 : 127793 : binaryheap_replace_first(gm_state->gm_heap, Int32GetDatum(i));
570 : : else
571 : : {
572 : : /* reader exhausted, remove it from heap */
573 : 184 : (void) binaryheap_remove_first(gm_state->gm_heap);
574 : : }
575 : : }
576 : :
577 [ + + ]: 128061 : if (binaryheap_empty(gm_state->gm_heap))
578 : : {
579 : : /* All the queues are exhausted, and so is the heap */
3029 tgl@sss.pgh.pa.us 580 : 66 : gather_merge_clear_tuples(gm_state);
3182 rhaas@postgresql.org 581 : 66 : return NULL;
582 : : }
583 : : else
584 : : {
585 : : /* Return next tuple from whichever participant has the leading one */
3204 586 : 127995 : i = DatumGetInt32(binaryheap_first(gm_state->gm_heap));
587 : 127995 : return gm_state->gm_slots[i];
588 : : }
589 : : }
590 : :
591 : : /*
592 : : * Read tuple(s) for given reader in nowait mode, and load into its tuple
593 : : * array, until we have MAX_TUPLE_STORE of them or would have to block.
594 : : */
595 : : static void
3030 tgl@sss.pgh.pa.us 596 : 1586 : load_tuple_array(GatherMergeState *gm_state, int reader)
597 : : {
598 : : GMReaderTupleBuffer *tuple_buffer;
599 : : int i;
600 : :
601 : : /* Don't do anything if this is the leader. */
3029 602 [ + + ]: 1586 : if (reader == 0)
3204 rhaas@postgresql.org 603 : 61 : return;
604 : :
3029 tgl@sss.pgh.pa.us 605 : 1525 : tuple_buffer = &gm_state->gm_tuple_buffers[reader - 1];
606 : :
607 : : /* If there's nothing in the array, reset the counters to zero. */
3204 rhaas@postgresql.org 608 [ + + ]: 1525 : if (tuple_buffer->nTuples == tuple_buffer->readCounter)
609 : 1505 : tuple_buffer->nTuples = tuple_buffer->readCounter = 0;
610 : :
611 : : /* Try to fill additional slots in the array. */
612 [ + + ]: 15898 : for (i = tuple_buffer->nTuples; i < MAX_TUPLE_STORE; i++)
613 : : {
614 : : MinimalTuple tuple;
615 : :
3030 tgl@sss.pgh.pa.us 616 : 14492 : tuple = gm_readnext_tuple(gm_state,
617 : : reader,
618 : : true,
619 : : &tuple_buffer->done);
1978 tmunro@postgresql.or 620 [ + + ]: 14492 : if (!tuple)
3204 rhaas@postgresql.org 621 : 119 : break;
2934 622 : 14373 : tuple_buffer->tuple[i] = tuple;
3204 623 : 14373 : tuple_buffer->nTuples++;
624 : : }
625 : : }
626 : :
627 : : /*
628 : : * Store the next tuple for a given reader into the appropriate slot.
629 : : *
630 : : * Returns true if successful, false if not (either reader is exhausted,
631 : : * or we didn't want to wait for a tuple). Sets done flag if reader
632 : : * is found to be exhausted.
633 : : */
634 : : static bool
635 : 128399 : gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait)
636 : : {
637 : : GMReaderTupleBuffer *tuple_buffer;
638 : : MinimalTuple tup;
639 : :
640 : : /*
641 : : * If we're being asked to generate a tuple from the leader, then we just
642 : : * call ExecProcNode as normal to produce one.
643 : : */
3029 tgl@sss.pgh.pa.us 644 [ + + ]: 128399 : if (reader == 0)
645 : : {
3204 rhaas@postgresql.org 646 [ + - ]: 112207 : if (gm_state->need_to_scan_locally)
647 : : {
648 : 112207 : PlanState *outerPlan = outerPlanState(gm_state);
649 : : TupleTableSlot *outerTupleSlot;
2791 tgl@sss.pgh.pa.us 650 : 112207 : EState *estate = gm_state->ps.state;
651 : :
652 : : /* Install our DSA area while executing the plan. */
2920 rhaas@postgresql.org 653 [ + - ]: 112207 : estate->es_query_dsa = gm_state->pei ? gm_state->pei->area : NULL;
3204 654 : 112207 : outerTupleSlot = ExecProcNode(outerPlan);
2920 655 : 112207 : estate->es_query_dsa = NULL;
656 : :
3204 657 [ + + + + ]: 112207 : if (!TupIsNull(outerTupleSlot))
658 : : {
3029 tgl@sss.pgh.pa.us 659 : 112144 : gm_state->gm_slots[0] = outerTupleSlot;
3204 rhaas@postgresql.org 660 : 112144 : return true;
661 : : }
662 : : /* need_to_scan_locally serves as "done" flag for leader */
663 : 63 : gm_state->need_to_scan_locally = false;
664 : : }
665 : 63 : return false;
666 : : }
667 : :
668 : : /* Otherwise, check the state of the relevant tuple buffer. */
3029 tgl@sss.pgh.pa.us 669 : 16192 : tuple_buffer = &gm_state->gm_tuple_buffers[reader - 1];
670 : :
3204 rhaas@postgresql.org 671 [ + + ]: 16192 : if (tuple_buffer->nTuples > tuple_buffer->readCounter)
672 : : {
673 : : /* Return any tuple previously read that is still buffered. */
674 : 14352 : tup = tuple_buffer->tuple[tuple_buffer->readCounter++];
675 : : }
676 [ + + ]: 1840 : else if (tuple_buffer->done)
677 : : {
678 : : /* Reader is known to be exhausted. */
679 : 115 : return false;
680 : : }
681 : : else
682 : : {
683 : : /* Read and buffer next tuple. */
3030 tgl@sss.pgh.pa.us 684 : 1725 : tup = gm_readnext_tuple(gm_state,
685 : : reader,
686 : : nowait,
687 : : &tuple_buffer->done);
1978 tmunro@postgresql.or 688 [ + + ]: 1725 : if (!tup)
3030 tgl@sss.pgh.pa.us 689 : 220 : return false;
690 : :
691 : : /*
692 : : * Attempt to read more tuples in nowait mode and store them in the
693 : : * pending-tuple array for the reader.
694 : : */
695 : 1505 : load_tuple_array(gm_state, reader);
696 : : }
697 : :
1978 tmunro@postgresql.or 698 [ - + ]: 15857 : Assert(tup);
699 : :
700 : : /* Build the TupleTableSlot for the given tuple */
1679 tgl@sss.pgh.pa.us 701 : 15857 : ExecStoreMinimalTuple(tup, /* tuple to store */
702 : 15857 : gm_state->gm_slots[reader], /* slot in which to
703 : : * store the tuple */
704 : : true); /* pfree tuple when done with it */
705 : :
3204 rhaas@postgresql.org 706 : 15857 : return true;
707 : : }
708 : :
709 : : /*
710 : : * Attempt to read a tuple from given worker.
711 : : */
712 : : static MinimalTuple
713 : 16217 : gm_readnext_tuple(GatherMergeState *gm_state, int nreader, bool nowait,
714 : : bool *done)
715 : : {
716 : : TupleQueueReader *reader;
717 : : MinimalTuple tup;
718 : :
719 : : /* Check for async events, particularly messages from workers. */
720 [ - + ]: 16217 : CHECK_FOR_INTERRUPTS();
721 : :
722 : : /*
723 : : * Attempt to read a tuple.
724 : : *
725 : : * Note that TupleQueueReaderNext will just return NULL for a worker which
726 : : * fails to initialize. We'll treat that worker as having produced no
727 : : * tuples; WaitForParallelWorkersToFinish will error out when we get
728 : : * there.
729 : : */
3029 tgl@sss.pgh.pa.us 730 : 16217 : reader = gm_state->reader[nreader - 1];
3204 rhaas@postgresql.org 731 : 16217 : tup = TupleQueueReaderNext(reader, nowait, done);
732 : :
733 : : /*
734 : : * Since we'll be buffering these across multiple calls, we need to make a
735 : : * copy.
736 : : */
267 jdavis@postgresql.or 737 [ + + ]: 16217 : return tup ? heap_copy_minimal_tuple(tup, 0) : NULL;
738 : : }
739 : :
740 : : /*
741 : : * We have one slot for each item in the heap array. We use SlotNumber
742 : : * to store slot indexes. This doesn't actually provide any formal
743 : : * type-safety, but it makes the code more self-documenting.
744 : : */
745 : : typedef int32 SlotNumber;
746 : :
747 : : /*
748 : : * Compare the tuples in the two given slots.
749 : : */
750 : : static int32
3204 rhaas@postgresql.org 751 : 148753 : heap_compare_slots(Datum a, Datum b, void *arg)
752 : : {
753 : 148753 : GatherMergeState *node = (GatherMergeState *) arg;
754 : 148753 : SlotNumber slot1 = DatumGetInt32(a);
755 : 148753 : SlotNumber slot2 = DatumGetInt32(b);
756 : :
757 : 148753 : TupleTableSlot *s1 = node->gm_slots[slot1];
758 : 148753 : TupleTableSlot *s2 = node->gm_slots[slot2];
759 : : int nkey;
760 : :
761 [ + - - + ]: 148753 : Assert(!TupIsNull(s1));
762 [ + - - + ]: 148753 : Assert(!TupIsNull(s2));
763 : :
764 [ + + ]: 220163 : for (nkey = 0; nkey < node->gm_nkeys; nkey++)
765 : : {
766 : 148753 : SortSupport sortKey = node->gm_sortkeys + nkey;
767 : 148753 : AttrNumber attno = sortKey->ssup_attno;
768 : : Datum datum1,
769 : : datum2;
770 : : bool isNull1,
771 : : isNull2;
772 : : int compare;
773 : :
774 : 148753 : datum1 = slot_getattr(s1, attno, &isNull1);
775 : 148753 : datum2 = slot_getattr(s2, attno, &isNull2);
776 : :
777 : 148753 : compare = ApplySortComparator(datum1, isNull1,
778 : : datum2, isNull2,
779 : : sortKey);
780 [ + + ]: 148753 : if (compare != 0)
781 : : {
2629 tgl@sss.pgh.pa.us 782 [ + + ]: 77343 : INVERT_COMPARE_RESULT(compare);
783 : 77343 : return compare;
784 : : }
785 : : }
3204 rhaas@postgresql.org 786 : 71410 : return 0;
787 : : }
|