Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * execParallel.c
4 : : * Support routines for parallel execution.
5 : : *
6 : : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
7 : : * Portions Copyright (c) 1994, Regents of the University of California
8 : : *
9 : : * This file contains routines that are intended to support setting up,
10 : : * using, and tearing down a ParallelContext from within the PostgreSQL
11 : : * executor. The ParallelContext machinery will handle starting the
12 : : * workers and ensuring that their state generally matches that of the
13 : : * leader; see src/backend/access/transam/README.parallel for details.
14 : : * However, we must save and restore relevant executor state, such as
15 : : * any ParamListInfo associated with the query, buffer/WAL usage info, and
16 : : * the actual plan to be passed down to the worker.
17 : : *
18 : : * IDENTIFICATION
19 : : * src/backend/executor/execParallel.c
20 : : *
21 : : *-------------------------------------------------------------------------
22 : : */
23 : :
24 : : #include "postgres.h"
25 : :
26 : : #include "executor/execParallel.h"
27 : : #include "executor/executor.h"
28 : : #include "executor/nodeAgg.h"
29 : : #include "executor/nodeAppend.h"
30 : : #include "executor/nodeBitmapHeapscan.h"
31 : : #include "executor/nodeBitmapIndexscan.h"
32 : : #include "executor/nodeCustom.h"
33 : : #include "executor/nodeForeignscan.h"
34 : : #include "executor/nodeHash.h"
35 : : #include "executor/nodeHashjoin.h"
36 : : #include "executor/nodeIncrementalSort.h"
37 : : #include "executor/nodeIndexonlyscan.h"
38 : : #include "executor/nodeIndexscan.h"
39 : : #include "executor/nodeMemoize.h"
40 : : #include "executor/nodeSeqscan.h"
41 : : #include "executor/nodeSort.h"
42 : : #include "executor/nodeSubplan.h"
43 : : #include "executor/nodeTidrangescan.h"
44 : : #include "executor/tqueue.h"
45 : : #include "jit/jit.h"
46 : : #include "nodes/nodeFuncs.h"
47 : : #include "pgstat.h"
48 : : #include "storage/proc.h"
49 : : #include "tcop/tcopprot.h"
50 : : #include "utils/datum.h"
51 : : #include "utils/dsa.h"
52 : : #include "utils/lsyscache.h"
53 : : #include "utils/snapmgr.h"
54 : :
55 : : /*
56 : : * Magic numbers for parallel executor communication. We use constants
57 : : * greater than any 32-bit integer here so that values < 2^32 can be used
58 : : * by individual parallel nodes to store their own state.
59 : : */
60 : : #define PARALLEL_KEY_EXECUTOR_FIXED UINT64CONST(0xE000000000000001)
61 : : #define PARALLEL_KEY_PLANNEDSTMT UINT64CONST(0xE000000000000002)
62 : : #define PARALLEL_KEY_PARAMLISTINFO UINT64CONST(0xE000000000000003)
63 : : #define PARALLEL_KEY_BUFFER_USAGE UINT64CONST(0xE000000000000004)
64 : : #define PARALLEL_KEY_TUPLE_QUEUE UINT64CONST(0xE000000000000005)
65 : : #define PARALLEL_KEY_INSTRUMENTATION UINT64CONST(0xE000000000000006)
66 : : #define PARALLEL_KEY_DSA UINT64CONST(0xE000000000000007)
67 : : #define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xE000000000000008)
68 : : #define PARALLEL_KEY_JIT_INSTRUMENTATION UINT64CONST(0xE000000000000009)
69 : : #define PARALLEL_KEY_WAL_USAGE UINT64CONST(0xE00000000000000A)
70 : :
71 : : #define PARALLEL_TUPLE_QUEUE_SIZE 65536
72 : :
73 : : /*
74 : : * Fixed-size random stuff that we need to pass to parallel workers.
75 : : */
76 : : typedef struct FixedParallelExecutorState
77 : : {
78 : : int64 tuples_needed; /* tuple bound, see ExecSetTupleBound */
79 : : dsa_pointer param_exec;
80 : : int eflags;
81 : : int jit_flags;
82 : : } FixedParallelExecutorState;
83 : :
84 : : /*
85 : : * DSM structure for accumulating per-PlanState instrumentation.
86 : : *
87 : : * instrument_options: Same meaning here as in instrument.c.
88 : : *
89 : : * instrument_offset: Offset, relative to the start of this structure,
90 : : * of the first Instrumentation object. This will depend on the length of
91 : : * the plan_node_id array.
92 : : *
93 : : * num_workers: Number of workers.
94 : : *
95 : : * num_plan_nodes: Number of plan nodes.
96 : : *
97 : : * plan_node_id: Array of plan nodes for which we are gathering instrumentation
98 : : * from parallel workers. The length of this array is given by num_plan_nodes.
99 : : */
100 : : struct SharedExecutorInstrumentation
101 : : {
102 : : int instrument_options;
103 : : int instrument_offset;
104 : : int num_workers;
105 : : int num_plan_nodes;
106 : : int plan_node_id[FLEXIBLE_ARRAY_MEMBER];
107 : : /* array of num_plan_nodes * num_workers Instrumentation objects follows */
108 : : };
109 : : #define GetInstrumentationArray(sei) \
110 : : (StaticAssertVariableIsOfTypeMacro(sei, SharedExecutorInstrumentation *), \
111 : : (Instrumentation *) (((char *) sei) + sei->instrument_offset))
112 : :
113 : : /* Context object for ExecParallelEstimate. */
114 : : typedef struct ExecParallelEstimateContext
115 : : {
116 : : ParallelContext *pcxt;
117 : : int nnodes;
118 : : } ExecParallelEstimateContext;
119 : :
120 : : /* Context object for ExecParallelInitializeDSM. */
121 : : typedef struct ExecParallelInitializeDSMContext
122 : : {
123 : : ParallelContext *pcxt;
124 : : SharedExecutorInstrumentation *instrumentation;
125 : : int nnodes;
126 : : } ExecParallelInitializeDSMContext;
127 : :
128 : : /* Helper functions that run in the parallel leader. */
129 : : static char *ExecSerializePlan(Plan *plan, EState *estate);
130 : : static bool ExecParallelEstimate(PlanState *planstate,
131 : : ExecParallelEstimateContext *e);
132 : : static bool ExecParallelInitializeDSM(PlanState *planstate,
133 : : ExecParallelInitializeDSMContext *d);
134 : : static shm_mq_handle **ExecParallelSetupTupleQueues(ParallelContext *pcxt,
135 : : bool reinitialize);
136 : : static bool ExecParallelReInitializeDSM(PlanState *planstate,
137 : : ParallelContext *pcxt);
138 : : static bool ExecParallelRetrieveInstrumentation(PlanState *planstate,
139 : : SharedExecutorInstrumentation *instrumentation);
140 : :
141 : : /* Helper function that runs in the parallel worker. */
142 : : static DestReceiver *ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc);
143 : :
144 : : /*
145 : : * Create a serialized representation of the plan to be sent to each worker.
146 : : */
147 : : static char *
3819 rhaas@postgresql.org 148 :CBC 384 : ExecSerializePlan(Plan *plan, EState *estate)
149 : : {
150 : : PlannedStmt *pstmt;
151 : : ListCell *lc;
152 : :
153 : : /* We can't scribble on the original plan, so make a copy. */
3821 154 : 384 : plan = copyObject(plan);
155 : :
156 : : /*
157 : : * The worker will start its own copy of the executor, and that copy will
158 : : * insert a junk filter if the toplevel node has any resjunk entries. We
159 : : * don't want that to happen, because while resjunk columns shouldn't be
160 : : * sent back to the user, here the tuples are coming back to another
161 : : * backend which may very well need them. So mutate the target list
162 : : * accordingly. This is sort of a hack; there might be better ways to do
163 : : * this...
164 : : */
3259 tgl@sss.pgh.pa.us 165 [ + + + + : 1045 : foreach(lc, plan->targetlist)
+ + ]
166 : : {
167 : 661 : TargetEntry *tle = lfirst_node(TargetEntry, lc);
168 : :
3821 rhaas@postgresql.org 169 : 661 : tle->resjunk = false;
170 : : }
171 : :
172 : : /*
173 : : * Create a dummy PlannedStmt. Most of the fields don't need to be valid
174 : : * for our purposes, but the worker will need at least a minimal
175 : : * PlannedStmt to start the executor.
176 : : */
177 : 384 : pstmt = makeNode(PlannedStmt);
178 : 384 : pstmt->commandType = CMD_SELECT;
1790 bruce@momjian.us 179 : 384 : pstmt->queryId = pgstat_get_my_query_id();
356 michael@paquier.xyz 180 : 384 : pstmt->planId = pgstat_get_my_plan_id();
3530 tgl@sss.pgh.pa.us 181 : 384 : pstmt->hasReturning = false;
182 : 384 : pstmt->hasModifyingCTE = false;
183 : 384 : pstmt->canSetTag = true;
184 : 384 : pstmt->transientPlan = false;
185 : 384 : pstmt->dependsOnRole = false;
186 : 384 : pstmt->parallelModeNeeded = false;
3821 rhaas@postgresql.org 187 : 384 : pstmt->planTree = plan;
409 amitlan@postgresql.o 188 : 384 : pstmt->partPruneInfos = estate->es_part_prune_infos;
3819 rhaas@postgresql.org 189 : 384 : pstmt->rtable = estate->es_range_table;
401 amitlan@postgresql.o 190 : 384 : pstmt->unprunableRelids = estate->es_unpruned_relids;
1195 alvherre@alvh.no-ip. 191 : 384 : pstmt->permInfos = estate->es_rteperminfos;
3821 rhaas@postgresql.org 192 : 384 : pstmt->resultRelations = NIL;
2286 tgl@sss.pgh.pa.us 193 : 384 : pstmt->appendRelations = NIL;
227 michael@paquier.xyz 194 :GNC 384 : pstmt->planOrigin = PLAN_STMT_INTERNAL;
195 : :
196 : : /*
197 : : * Transfer only parallel-safe subplans, leaving a NULL "hole" in the list
198 : : * for unsafe ones (so that the list indexes of the safe ones are
199 : : * preserved). This positively ensures that the worker won't try to run,
200 : : * or even do ExecInitNode on, an unsafe subplan. That's important to
201 : : * protect, eg, non-parallel-aware FDWs from getting into trouble.
202 : : */
3259 tgl@sss.pgh.pa.us 203 :CBC 384 : pstmt->subplans = NIL;
204 [ + + + + : 411 : foreach(lc, estate->es_plannedstmt->subplans)
+ + ]
205 : : {
206 : 27 : Plan *subplan = (Plan *) lfirst(lc);
207 : :
208 [ + - + + ]: 27 : if (subplan && !subplan->parallel_safe)
209 : 6 : subplan = NULL;
210 : 27 : pstmt->subplans = lappend(pstmt->subplans, subplan);
211 : : }
212 : :
3821 rhaas@postgresql.org 213 : 384 : pstmt->rewindPlanIDs = NULL;
214 : 384 : pstmt->rowMarks = NIL;
215 : 384 : pstmt->relationOids = NIL;
216 : 384 : pstmt->invalItems = NIL; /* workers can't replan anyway... */
3044 217 : 384 : pstmt->paramExecTypes = estate->es_plannedstmt->paramExecTypes;
3347 tgl@sss.pgh.pa.us 218 : 384 : pstmt->utilityStmt = NULL;
219 : 384 : pstmt->stmt_location = -1;
220 : 384 : pstmt->stmt_len = -1;
221 : :
222 : : /* Return serialized copy of our dummy PlannedStmt. */
3821 rhaas@postgresql.org 223 : 384 : return nodeToString(pstmt);
224 : : }
225 : :
226 : : /*
227 : : * Parallel-aware plan nodes (and occasionally others) may need some state
228 : : * which is shared across all parallel workers. Before we size the DSM, give
229 : : * them a chance to call shm_toc_estimate_chunk or shm_toc_estimate_keys on
230 : : * &pcxt->estimator.
231 : : *
232 : : * While we're at it, count the number of PlanState nodes in the tree, so
233 : : * we know how many Instrumentation structures we need.
234 : : */
235 : : static bool
236 : 1545 : ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e)
237 : : {
238 [ - + ]: 1545 : if (planstate == NULL)
3821 rhaas@postgresql.org 239 :UBC 0 : return false;
240 : :
241 : : /* Count this node. */
3821 rhaas@postgresql.org 242 :CBC 1545 : e->nnodes++;
243 : :
3120 244 [ + + + + : 1545 : switch (nodeTag(planstate))
- + + - +
+ + + - +
+ + ]
245 : : {
246 : 588 : case T_SeqScanState:
247 [ + + ]: 588 : if (planstate->plan->parallel_aware)
3707 248 : 469 : ExecSeqScanEstimate((SeqScanState *) planstate,
249 : : e->pcxt);
3120 250 : 588 : break;
251 : 147 : case T_IndexScanState:
252 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
369 pg@bowt.ie 253 : 147 : ExecIndexScanEstimate((IndexScanState *) planstate,
254 : : e->pcxt);
3120 rhaas@postgresql.org 255 : 147 : break;
256 : 29 : case T_IndexOnlyScanState:
257 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
369 pg@bowt.ie 258 : 29 : ExecIndexOnlyScanEstimate((IndexOnlyScanState *) planstate,
259 : : e->pcxt);
260 : 29 : break;
261 : 10 : case T_BitmapIndexScanState:
262 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
263 : 10 : ExecBitmapIndexScanEstimate((BitmapIndexScanState *) planstate,
264 : : e->pcxt);
3120 rhaas@postgresql.org 265 : 10 : break;
3120 rhaas@postgresql.org 266 :UBC 0 : case T_ForeignScanState:
267 [ # # ]: 0 : if (planstate->plan->parallel_aware)
3693 268 : 0 : ExecForeignScanEstimate((ForeignScanState *) planstate,
269 : : e->pcxt);
3120 270 : 0 : break;
108 drowley@postgresql.o 271 :GNC 12 : case T_TidRangeScanState:
272 [ + - ]: 12 : if (planstate->plan->parallel_aware)
273 : 12 : ExecTidRangeScanEstimate((TidRangeScanState *) planstate,
274 : : e->pcxt);
275 : 12 : break;
3022 rhaas@postgresql.org 276 :CBC 93 : case T_AppendState:
277 [ + + ]: 93 : if (planstate->plan->parallel_aware)
278 : 69 : ExecAppendEstimate((AppendState *) planstate,
279 : : e->pcxt);
280 : 93 : break;
3120 rhaas@postgresql.org 281 :UBC 0 : case T_CustomScanState:
282 [ # # ]: 0 : if (planstate->plan->parallel_aware)
3693 283 : 0 : ExecCustomScanEstimate((CustomScanState *) planstate,
284 : : e->pcxt);
3120 285 : 0 : break;
3120 rhaas@postgresql.org 286 :CBC 10 : case T_BitmapHeapScanState:
287 [ + + ]: 10 : if (planstate->plan->parallel_aware)
3294 288 : 9 : ExecBitmapHeapEstimate((BitmapHeapScanState *) planstate,
289 : : e->pcxt);
3120 290 : 10 : break;
3007 andres@anarazel.de 291 : 99 : case T_HashJoinState:
292 [ + + ]: 99 : if (planstate->plan->parallel_aware)
293 : 63 : ExecHashJoinEstimate((HashJoinState *) planstate,
294 : : e->pcxt);
295 : 99 : break;
3022 296 : 99 : case T_HashState:
297 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
298 : 99 : ExecHashEstimate((HashState *) planstate, e->pcxt);
299 : 99 : break;
3120 rhaas@postgresql.org 300 : 79 : case T_SortState:
301 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
302 : 79 : ExecSortEstimate((SortState *) planstate, e->pcxt);
3119 tgl@sss.pgh.pa.us 303 : 79 : break;
2169 tomas.vondra@postgre 304 :UBC 0 : case T_IncrementalSortState:
305 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
306 : 0 : ExecIncrementalSortEstimate((IncrementalSortState *) planstate, e->pcxt);
307 : 0 : break;
2095 drowley@postgresql.o 308 :CBC 293 : case T_AggState:
309 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
310 : 293 : ExecAggEstimate((AggState *) planstate, e->pcxt);
311 : 293 : break;
1705 312 : 3 : case T_MemoizeState:
313 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
314 : 3 : ExecMemoizeEstimate((MemoizeState *) planstate, e->pcxt);
1808 315 : 3 : break;
3120 rhaas@postgresql.org 316 : 83 : default:
317 : 83 : break;
318 : : }
319 : :
3821 320 : 1545 : return planstate_tree_walker(planstate, ExecParallelEstimate, e);
321 : : }
322 : :
323 : : /*
324 : : * Estimate the amount of space required to serialize the indicated parameters.
325 : : */
326 : : static Size
3041 327 : 12 : EstimateParamExecSpace(EState *estate, Bitmapset *params)
328 : : {
329 : : int paramid;
330 : 12 : Size sz = sizeof(int);
331 : :
332 : 12 : paramid = -1;
333 [ + + ]: 27 : while ((paramid = bms_next_member(params, paramid)) >= 0)
334 : : {
335 : : Oid typeOid;
336 : : int16 typLen;
337 : : bool typByVal;
338 : : ParamExecData *prm;
339 : :
340 : 15 : prm = &(estate->es_param_exec_vals[paramid]);
341 : 15 : typeOid = list_nth_oid(estate->es_plannedstmt->paramExecTypes,
342 : : paramid);
343 : :
344 : 15 : sz = add_size(sz, sizeof(int)); /* space for paramid */
345 : :
346 : : /* space for datum/isnull */
347 [ + - ]: 15 : if (OidIsValid(typeOid))
348 : 15 : get_typlenbyval(typeOid, &typLen, &typByVal);
349 : : else
350 : : {
351 : : /* If no type OID, assume by-value, like copyParamList does. */
3041 rhaas@postgresql.org 352 :UBC 0 : typLen = sizeof(Datum);
353 : 0 : typByVal = true;
354 : : }
3041 rhaas@postgresql.org 355 :CBC 15 : sz = add_size(sz,
356 : 15 : datumEstimateSpace(prm->value, prm->isnull,
357 : : typByVal, typLen));
358 : : }
359 : 12 : return sz;
360 : : }
361 : :
362 : : /*
363 : : * Serialize specified PARAM_EXEC parameters.
364 : : *
365 : : * We write the number of parameters first, as a 4-byte integer, and then
366 : : * write details for each parameter in turn. The details for each parameter
367 : : * consist of a 4-byte paramid (location of param in execution time internal
368 : : * parameter array) and then the datum as serialized by datumSerialize().
369 : : */
370 : : static dsa_pointer
3009 371 : 12 : SerializeParamExecParams(EState *estate, Bitmapset *params, dsa_area *area)
372 : : {
373 : : Size size;
374 : : int nparams;
375 : : int paramid;
376 : : ParamExecData *prm;
377 : : dsa_pointer handle;
378 : : char *start_address;
379 : :
380 : : /* Allocate enough space for the current parameter values. */
3041 381 : 12 : size = EstimateParamExecSpace(estate, params);
3009 382 : 12 : handle = dsa_allocate(area, size);
383 : 12 : start_address = dsa_get_address(area, handle);
384 : :
385 : : /* First write the number of parameters as a 4-byte integer. */
3041 386 : 12 : nparams = bms_num_members(params);
387 : 12 : memcpy(start_address, &nparams, sizeof(int));
388 : 12 : start_address += sizeof(int);
389 : :
390 : : /* Write details for each parameter in turn. */
391 : 12 : paramid = -1;
392 [ + + ]: 27 : while ((paramid = bms_next_member(params, paramid)) >= 0)
393 : : {
394 : : Oid typeOid;
395 : : int16 typLen;
396 : : bool typByVal;
397 : :
398 : 15 : prm = &(estate->es_param_exec_vals[paramid]);
399 : 15 : typeOid = list_nth_oid(estate->es_plannedstmt->paramExecTypes,
400 : : paramid);
401 : :
402 : : /* Write paramid. */
403 : 15 : memcpy(start_address, ¶mid, sizeof(int));
404 : 15 : start_address += sizeof(int);
405 : :
406 : : /* Write datum/isnull */
407 [ + - ]: 15 : if (OidIsValid(typeOid))
408 : 15 : get_typlenbyval(typeOid, &typLen, &typByVal);
409 : : else
410 : : {
411 : : /* If no type OID, assume by-value, like copyParamList does. */
3041 rhaas@postgresql.org 412 :UBC 0 : typLen = sizeof(Datum);
413 : 0 : typByVal = true;
414 : : }
3041 rhaas@postgresql.org 415 :CBC 15 : datumSerialize(prm->value, prm->isnull, typByVal, typLen,
416 : : &start_address);
417 : : }
418 : :
419 : 12 : return handle;
420 : : }
421 : :
422 : : /*
423 : : * Restore specified PARAM_EXEC parameters.
424 : : */
425 : : static void
426 : 36 : RestoreParamExecParams(char *start_address, EState *estate)
427 : : {
428 : : int nparams;
429 : : int i;
430 : : int paramid;
431 : :
432 : 36 : memcpy(&nparams, start_address, sizeof(int));
433 : 36 : start_address += sizeof(int);
434 : :
435 [ + + ]: 78 : for (i = 0; i < nparams; i++)
436 : : {
437 : : ParamExecData *prm;
438 : :
439 : : /* Read paramid */
440 : 42 : memcpy(¶mid, start_address, sizeof(int));
441 : 42 : start_address += sizeof(int);
442 : 42 : prm = &(estate->es_param_exec_vals[paramid]);
443 : :
444 : : /* Read datum/isnull. */
445 : 42 : prm->value = datumRestore(&start_address, &prm->isnull);
446 : 42 : prm->execPlan = NULL;
447 : : }
448 : 36 : }
449 : :
450 : : /*
451 : : * Initialize the dynamic shared memory segment that will be used to control
452 : : * parallel execution.
453 : : */
454 : : static bool
3821 455 : 1545 : ExecParallelInitializeDSM(PlanState *planstate,
456 : : ExecParallelInitializeDSMContext *d)
457 : : {
458 [ - + ]: 1545 : if (planstate == NULL)
3821 rhaas@postgresql.org 459 :UBC 0 : return false;
460 : :
461 : : /* If instrumentation is enabled, initialize slot for this node. */
3821 rhaas@postgresql.org 462 [ + + ]:CBC 1545 : if (d->instrumentation != NULL)
3749 463 : 513 : d->instrumentation->plan_node_id[d->nnodes] =
464 : 513 : planstate->plan->plan_node_id;
465 : :
466 : : /* Count this node. */
3821 467 : 1545 : d->nnodes++;
468 : :
469 : : /*
470 : : * Call initializers for DSM-using plan nodes.
471 : : *
472 : : * Most plan nodes won't do anything here, but plan nodes that allocated
473 : : * DSM may need to initialize shared state in the DSM before parallel
474 : : * workers are launched. They can allocate the space they previously
475 : : * estimated using shm_toc_allocate, and add the keys they previously
476 : : * estimated using shm_toc_insert, in each case targeting pcxt->toc.
477 : : */
3120 478 [ + + + + : 1545 : switch (nodeTag(planstate))
- + + - +
+ + + - +
+ + ]
479 : : {
480 : 588 : case T_SeqScanState:
481 [ + + ]: 588 : if (planstate->plan->parallel_aware)
3707 482 : 469 : ExecSeqScanInitializeDSM((SeqScanState *) planstate,
483 : : d->pcxt);
3120 484 : 588 : break;
485 : 147 : case T_IndexScanState:
486 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
369 pg@bowt.ie 487 : 147 : ExecIndexScanInitializeDSM((IndexScanState *) planstate, d->pcxt);
3120 rhaas@postgresql.org 488 : 147 : break;
489 : 29 : case T_IndexOnlyScanState:
490 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
369 pg@bowt.ie 491 : 29 : ExecIndexOnlyScanInitializeDSM((IndexOnlyScanState *) planstate,
492 : : d->pcxt);
493 : 29 : break;
494 : 10 : case T_BitmapIndexScanState:
495 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
496 : 10 : ExecBitmapIndexScanInitializeDSM((BitmapIndexScanState *) planstate, d->pcxt);
3120 rhaas@postgresql.org 497 : 10 : break;
3120 rhaas@postgresql.org 498 :UBC 0 : case T_ForeignScanState:
499 [ # # ]: 0 : if (planstate->plan->parallel_aware)
3693 500 : 0 : ExecForeignScanInitializeDSM((ForeignScanState *) planstate,
501 : : d->pcxt);
3120 502 : 0 : break;
108 drowley@postgresql.o 503 :GNC 12 : case T_TidRangeScanState:
504 [ + - ]: 12 : if (planstate->plan->parallel_aware)
505 : 12 : ExecTidRangeScanInitializeDSM((TidRangeScanState *) planstate,
506 : : d->pcxt);
507 : 12 : break;
3022 rhaas@postgresql.org 508 :CBC 93 : case T_AppendState:
509 [ + + ]: 93 : if (planstate->plan->parallel_aware)
510 : 69 : ExecAppendInitializeDSM((AppendState *) planstate,
511 : : d->pcxt);
512 : 93 : break;
3120 rhaas@postgresql.org 513 :UBC 0 : case T_CustomScanState:
514 [ # # ]: 0 : if (planstate->plan->parallel_aware)
3693 515 : 0 : ExecCustomScanInitializeDSM((CustomScanState *) planstate,
516 : : d->pcxt);
3120 517 : 0 : break;
3120 rhaas@postgresql.org 518 :CBC 10 : case T_BitmapHeapScanState:
519 [ + + ]: 10 : if (planstate->plan->parallel_aware)
3294 520 : 9 : ExecBitmapHeapInitializeDSM((BitmapHeapScanState *) planstate,
521 : : d->pcxt);
3120 522 : 10 : break;
3007 andres@anarazel.de 523 : 99 : case T_HashJoinState:
524 [ + + ]: 99 : if (planstate->plan->parallel_aware)
525 : 63 : ExecHashJoinInitializeDSM((HashJoinState *) planstate,
526 : : d->pcxt);
527 : 99 : break;
3022 528 : 99 : case T_HashState:
529 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
530 : 99 : ExecHashInitializeDSM((HashState *) planstate, d->pcxt);
531 : 99 : break;
3120 rhaas@postgresql.org 532 : 79 : case T_SortState:
533 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
534 : 79 : ExecSortInitializeDSM((SortState *) planstate, d->pcxt);
3119 tgl@sss.pgh.pa.us 535 : 79 : break;
2169 tomas.vondra@postgre 536 :UBC 0 : case T_IncrementalSortState:
537 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
538 : 0 : ExecIncrementalSortInitializeDSM((IncrementalSortState *) planstate, d->pcxt);
539 : 0 : break;
2095 drowley@postgresql.o 540 :CBC 293 : case T_AggState:
541 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
542 : 293 : ExecAggInitializeDSM((AggState *) planstate, d->pcxt);
543 : 293 : break;
1705 544 : 3 : case T_MemoizeState:
545 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
546 : 3 : ExecMemoizeInitializeDSM((MemoizeState *) planstate, d->pcxt);
1808 547 : 3 : break;
3120 rhaas@postgresql.org 548 : 83 : default:
549 : 83 : break;
550 : : }
551 : :
3821 552 : 1545 : return planstate_tree_walker(planstate, ExecParallelInitializeDSM, d);
553 : : }
554 : :
555 : : /*
556 : : * It sets up the response queues for backend workers to return tuples
557 : : * to the main backend and start the workers.
558 : : */
559 : : static shm_mq_handle **
3789 560 : 513 : ExecParallelSetupTupleQueues(ParallelContext *pcxt, bool reinitialize)
561 : : {
562 : : shm_mq_handle **responseq;
563 : : char *tqueuespace;
564 : : int i;
565 : :
566 : : /* Skip this if no workers. */
3821 567 [ - + ]: 513 : if (pcxt->nworkers == 0)
3821 rhaas@postgresql.org 568 :UBC 0 : return NULL;
569 : :
570 : : /* Allocate memory for shared memory queue handles. */
571 : : responseq = (shm_mq_handle **)
3821 rhaas@postgresql.org 572 :CBC 513 : palloc(pcxt->nworkers * sizeof(shm_mq_handle *));
573 : :
574 : : /*
575 : : * If not reinitializing, allocate space from the DSM for the queues;
576 : : * otherwise, find the already allocated space.
577 : : */
3789 578 [ + + ]: 513 : if (!reinitialize)
579 : : tqueuespace =
580 : 384 : shm_toc_allocate(pcxt->toc,
581 : : mul_size(PARALLEL_TUPLE_QUEUE_SIZE,
3600 582 : 384 : pcxt->nworkers));
583 : : else
3205 tgl@sss.pgh.pa.us 584 : 129 : tqueuespace = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE, false);
585 : :
586 : : /* Create the queues, and become the receiver for each. */
3821 rhaas@postgresql.org 587 [ + + ]: 1898 : for (i = 0; i < pcxt->nworkers; ++i)
588 : : {
589 : : shm_mq *mq;
590 : :
3600 591 : 1385 : mq = shm_mq_create(tqueuespace +
592 : 1385 : ((Size) i) * PARALLEL_TUPLE_QUEUE_SIZE,
593 : : (Size) PARALLEL_TUPLE_QUEUE_SIZE);
594 : :
3821 595 : 1385 : shm_mq_set_receiver(mq, MyProc);
596 : 1385 : responseq[i] = shm_mq_attach(mq, pcxt->seg, NULL);
597 : : }
598 : :
599 : : /* Add array of queues to shm_toc, so others can find it. */
3789 600 [ + + ]: 513 : if (!reinitialize)
601 : 384 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE, tqueuespace);
602 : :
603 : : /* Return array of handles. */
3821 604 : 513 : return responseq;
605 : : }
606 : :
607 : : /*
608 : : * Sets up the required infrastructure for backend workers to perform
609 : : * execution and return results to the main backend.
610 : : */
611 : : ParallelExecutorInfo *
3041 612 : 384 : ExecInitParallelPlan(PlanState *planstate, EState *estate,
613 : : Bitmapset *sendParams, int nworkers,
614 : : int64 tuples_needed)
615 : : {
616 : : ParallelExecutorInfo *pei;
617 : : ParallelContext *pcxt;
618 : : ExecParallelEstimateContext e;
619 : : ExecParallelInitializeDSMContext d;
620 : : FixedParallelExecutorState *fpes;
621 : : char *pstmt_data;
622 : : char *pstmt_space;
623 : : char *paramlistinfo_space;
624 : : BufferUsage *bufusage_space;
625 : : WalUsage *walusage_space;
3821 626 : 384 : SharedExecutorInstrumentation *instrumentation = NULL;
2728 andres@anarazel.de 627 : 384 : SharedJitInstrumentation *jit_instrumentation = NULL;
628 : : int pstmt_len;
629 : : int paramlistinfo_len;
3821 rhaas@postgresql.org 630 : 384 : int instrumentation_len = 0;
2728 andres@anarazel.de 631 : 384 : int jit_instrumentation_len = 0;
3749 rhaas@postgresql.org 632 : 384 : int instrument_offset = 0;
3373 633 : 384 : Size dsa_minsize = dsa_minimum_size();
634 : : char *query_string;
635 : : int query_len;
636 : :
637 : : /*
638 : : * Force any initplan outputs that we're going to pass to workers to be
639 : : * evaluated, if they weren't already.
640 : : *
641 : : * For simplicity, we use the EState's per-output-tuple ExprContext here.
642 : : * That risks intra-query memory leakage, since we might pass through here
643 : : * many times before that ExprContext gets reset; but ExecSetParamPlan
644 : : * doesn't normally leak any memory in the context (see its comments), so
645 : : * it doesn't seem worth complicating this function's API to pass it a
646 : : * shorter-lived ExprContext. This might need to change someday.
647 : : */
2738 tgl@sss.pgh.pa.us 648 [ + + ]: 384 : ExecSetParamPlanMulti(sendParams, GetPerTupleExprContext(estate));
649 : :
650 : : /* Allocate object for return value. */
95 michael@paquier.xyz 651 :GNC 384 : pei = palloc0_object(ParallelExecutorInfo);
3770 rhaas@postgresql.org 652 :CBC 384 : pei->finished = false;
3821 653 : 384 : pei->planstate = planstate;
654 : :
655 : : /* Fix up and serialize plan to be sent to workers. */
3819 656 : 384 : pstmt_data = ExecSerializePlan(planstate->plan, estate);
657 : :
658 : : /* Create a parallel context. */
2557 tmunro@postgresql.or 659 : 384 : pcxt = CreateParallelContext("postgres", "ParallelQueryMain", nworkers);
3821 rhaas@postgresql.org 660 : 384 : pei->pcxt = pcxt;
661 : :
662 : : /*
663 : : * Before telling the parallel context to create a dynamic shared memory
664 : : * segment, we need to figure out how big it should be. Estimate space
665 : : * for the various things we need to store.
666 : : */
667 : :
668 : : /* Estimate space for fixed-size state. */
3120 669 : 384 : shm_toc_estimate_chunk(&pcxt->estimator,
670 : : sizeof(FixedParallelExecutorState));
671 : 384 : shm_toc_estimate_keys(&pcxt->estimator, 1);
672 : :
673 : : /* Estimate space for query text. */
1795 tgl@sss.pgh.pa.us 674 : 384 : query_len = strlen(estate->es_sourceText);
3007 rhaas@postgresql.org 675 : 384 : shm_toc_estimate_chunk(&pcxt->estimator, query_len + 1);
3308 676 : 384 : shm_toc_estimate_keys(&pcxt->estimator, 1);
677 : :
678 : : /* Estimate space for serialized PlannedStmt. */
3821 679 : 384 : pstmt_len = strlen(pstmt_data) + 1;
680 : 384 : shm_toc_estimate_chunk(&pcxt->estimator, pstmt_len);
681 : 384 : shm_toc_estimate_keys(&pcxt->estimator, 1);
682 : :
683 : : /* Estimate space for serialized ParamListInfo. */
3041 684 : 384 : paramlistinfo_len = EstimateParamListSpace(estate->es_param_list_info);
685 : 384 : shm_toc_estimate_chunk(&pcxt->estimator, paramlistinfo_len);
3821 686 : 384 : shm_toc_estimate_keys(&pcxt->estimator, 1);
687 : :
688 : : /*
689 : : * Estimate space for BufferUsage.
690 : : *
691 : : * If EXPLAIN is not in use and there are no extensions loaded that care,
692 : : * we could skip this. But we have no way of knowing whether anyone's
693 : : * looking at pgBufferUsage, so do it unconditionally.
694 : : */
695 : 384 : shm_toc_estimate_chunk(&pcxt->estimator,
696 : : mul_size(sizeof(BufferUsage), pcxt->nworkers));
697 : 384 : shm_toc_estimate_keys(&pcxt->estimator, 1);
698 : :
699 : : /*
700 : : * Same thing for WalUsage.
701 : : */
2171 akapila@postgresql.o 702 : 384 : shm_toc_estimate_chunk(&pcxt->estimator,
703 : : mul_size(sizeof(WalUsage), pcxt->nworkers));
704 : 384 : shm_toc_estimate_keys(&pcxt->estimator, 1);
705 : :
706 : : /* Estimate space for tuple queues. */
3821 rhaas@postgresql.org 707 : 384 : shm_toc_estimate_chunk(&pcxt->estimator,
708 : : mul_size(PARALLEL_TUPLE_QUEUE_SIZE, pcxt->nworkers));
709 : 384 : shm_toc_estimate_keys(&pcxt->estimator, 1);
710 : :
711 : : /*
712 : : * Give parallel-aware nodes a chance to add to the estimates, and get a
713 : : * count of how many PlanState nodes there are.
714 : : */
715 : 384 : e.pcxt = pcxt;
716 : 384 : e.nnodes = 0;
717 : 384 : ExecParallelEstimate(planstate, &e);
718 : :
719 : : /* Estimate space for instrumentation, if required. */
720 [ + + ]: 384 : if (estate->es_instrument)
721 : : {
722 : 90 : instrumentation_len =
723 : : offsetof(SharedExecutorInstrumentation, plan_node_id) +
3603 724 : 90 : sizeof(int) * e.nnodes;
3749 725 : 90 : instrumentation_len = MAXALIGN(instrumentation_len);
726 : 90 : instrument_offset = instrumentation_len;
3600 727 : 90 : instrumentation_len +=
728 : 90 : mul_size(sizeof(Instrumentation),
729 : 90 : mul_size(e.nnodes, nworkers));
3821 730 : 90 : shm_toc_estimate_chunk(&pcxt->estimator, instrumentation_len);
731 : 90 : shm_toc_estimate_keys(&pcxt->estimator, 1);
732 : :
733 : : /* Estimate space for JIT instrumentation, if required. */
2728 andres@anarazel.de 734 [ - + ]: 90 : if (estate->es_jit_flags != PGJIT_NONE)
735 : : {
2728 andres@anarazel.de 736 :LBC (12) : jit_instrumentation_len =
737 : (12) : offsetof(SharedJitInstrumentation, jit_instr) +
738 : : sizeof(JitInstrumentation) * nworkers;
739 : (12) : shm_toc_estimate_chunk(&pcxt->estimator, jit_instrumentation_len);
740 : (12) : shm_toc_estimate_keys(&pcxt->estimator, 1);
741 : : }
742 : : }
743 : :
744 : : /* Estimate space for DSA area. */
3373 rhaas@postgresql.org 745 :CBC 384 : shm_toc_estimate_chunk(&pcxt->estimator, dsa_minsize);
746 : 384 : shm_toc_estimate_keys(&pcxt->estimator, 1);
747 : :
748 : : /*
749 : : * InitializeParallelDSM() passes the active snapshot to the parallel
750 : : * worker, which uses it to set es_snapshot. Make sure we don't set
751 : : * es_snapshot differently in the child.
752 : : */
731 heikki.linnakangas@i 753 [ - + ]: 384 : Assert(GetActiveSnapshot() == estate->es_snapshot);
754 : :
755 : : /* Everyone's had a chance to ask for space, so now create the DSM. */
3821 rhaas@postgresql.org 756 : 384 : InitializeParallelDSM(pcxt);
757 : :
758 : : /*
759 : : * OK, now we have a dynamic shared memory segment, and it should be big
760 : : * enough to store all of the data we estimated we would want to put into
761 : : * it, plus whatever general stuff (not specifically executor-related) the
762 : : * ParallelContext itself needs to store there. None of the space we
763 : : * asked for has been allocated or initialized yet, though, so do that.
764 : : */
765 : :
766 : : /* Store fixed-size state. */
3120 767 : 384 : fpes = shm_toc_allocate(pcxt->toc, sizeof(FixedParallelExecutorState));
768 : 384 : fpes->tuples_needed = tuples_needed;
3041 769 : 384 : fpes->param_exec = InvalidDsaPointer;
3037 770 : 384 : fpes->eflags = estate->es_top_eflags;
2915 andres@anarazel.de 771 : 384 : fpes->jit_flags = estate->es_jit_flags;
3120 rhaas@postgresql.org 772 : 384 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_EXECUTOR_FIXED, fpes);
773 : :
774 : : /* Store query string */
3007 775 : 384 : query_string = shm_toc_allocate(pcxt->toc, query_len + 1);
1795 tgl@sss.pgh.pa.us 776 : 384 : memcpy(query_string, estate->es_sourceText, query_len + 1);
3308 rhaas@postgresql.org 777 : 384 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_QUERY_TEXT, query_string);
778 : :
779 : : /* Store serialized PlannedStmt. */
3821 780 : 384 : pstmt_space = shm_toc_allocate(pcxt->toc, pstmt_len);
781 : 384 : memcpy(pstmt_space, pstmt_data, pstmt_len);
782 : 384 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_PLANNEDSTMT, pstmt_space);
783 : :
784 : : /* Store serialized ParamListInfo. */
3041 785 : 384 : paramlistinfo_space = shm_toc_allocate(pcxt->toc, paramlistinfo_len);
786 : 384 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_PARAMLISTINFO, paramlistinfo_space);
787 : 384 : SerializeParamList(estate->es_param_list_info, ¶mlistinfo_space);
788 : :
789 : : /* Allocate space for each worker's BufferUsage; no need to initialize. */
3821 790 : 384 : bufusage_space = shm_toc_allocate(pcxt->toc,
3189 tgl@sss.pgh.pa.us 791 : 384 : mul_size(sizeof(BufferUsage), pcxt->nworkers));
3821 rhaas@postgresql.org 792 : 384 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_BUFFER_USAGE, bufusage_space);
793 : 384 : pei->buffer_usage = bufusage_space;
794 : :
795 : : /* Same for WalUsage. */
2171 akapila@postgresql.o 796 : 384 : walusage_space = shm_toc_allocate(pcxt->toc,
797 : 384 : mul_size(sizeof(WalUsage), pcxt->nworkers));
798 : 384 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_WAL_USAGE, walusage_space);
799 : 384 : pei->wal_usage = walusage_space;
800 : :
801 : : /* Set up the tuple queues that the workers will write into. */
3789 rhaas@postgresql.org 802 : 384 : pei->tqueue = ExecParallelSetupTupleQueues(pcxt, false);
803 : :
804 : : /* We don't need the TupleQueueReaders yet, though. */
3117 tgl@sss.pgh.pa.us 805 : 384 : pei->reader = NULL;
806 : :
807 : : /*
808 : : * If instrumentation options were supplied, allocate space for the data.
809 : : * It only gets partially initialized here; the rest happens during
810 : : * ExecParallelInitializeDSM.
811 : : */
3821 rhaas@postgresql.org 812 [ + + ]: 384 : if (estate->es_instrument)
813 : : {
814 : : Instrumentation *instrument;
815 : : int i;
816 : :
817 : 90 : instrumentation = shm_toc_allocate(pcxt->toc, instrumentation_len);
818 : 90 : instrumentation->instrument_options = estate->es_instrument;
3749 819 : 90 : instrumentation->instrument_offset = instrument_offset;
820 : 90 : instrumentation->num_workers = nworkers;
821 : 90 : instrumentation->num_plan_nodes = e.nnodes;
822 : 90 : instrument = GetInstrumentationArray(instrumentation);
823 [ + + ]: 930 : for (i = 0; i < nworkers * e.nnodes; ++i)
824 : 840 : InstrInit(&instrument[i], estate->es_instrument);
3821 825 : 90 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_INSTRUMENTATION,
826 : : instrumentation);
827 : 90 : pei->instrumentation = instrumentation;
828 : :
2728 andres@anarazel.de 829 [ - + ]: 90 : if (estate->es_jit_flags != PGJIT_NONE)
830 : : {
2728 andres@anarazel.de 831 :LBC (12) : jit_instrumentation = shm_toc_allocate(pcxt->toc,
832 : : jit_instrumentation_len);
833 : (12) : jit_instrumentation->num_workers = nworkers;
834 : (12) : memset(jit_instrumentation->jit_instr, 0,
835 : : sizeof(JitInstrumentation) * nworkers);
836 : (12) : shm_toc_insert(pcxt->toc, PARALLEL_KEY_JIT_INSTRUMENTATION,
837 : : jit_instrumentation);
838 : (12) : pei->jit_instrumentation = jit_instrumentation;
839 : : }
840 : : }
841 : :
842 : : /*
843 : : * Create a DSA area that can be used by the leader and all workers.
844 : : * (However, if we failed to create a DSM and are using private memory
845 : : * instead, then skip this.)
846 : : */
3373 rhaas@postgresql.org 847 [ + - ]:CBC 384 : if (pcxt->seg != NULL)
848 : : {
849 : : char *area_space;
850 : :
851 : 384 : area_space = shm_toc_allocate(pcxt->toc, dsa_minsize);
852 : 384 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_DSA, area_space);
853 : 384 : pei->area = dsa_create_in_place(area_space, dsa_minsize,
854 : : LWTRANCHE_PARALLEL_QUERY_DSA,
855 : : pcxt->seg);
856 : :
857 : : /*
858 : : * Serialize parameters, if any, using DSA storage. We don't dare use
859 : : * the main parallel query DSM for this because we might relaunch
860 : : * workers after the values have changed (and thus the amount of
861 : : * storage required has changed).
862 : : */
3041 863 [ + + ]: 384 : if (!bms_is_empty(sendParams))
864 : : {
3009 865 : 12 : pei->param_exec = SerializeParamExecParams(estate, sendParams,
866 : : pei->area);
3041 867 : 12 : fpes->param_exec = pei->param_exec;
868 : : }
869 : : }
870 : :
871 : : /*
872 : : * Give parallel-aware nodes a chance to initialize their shared data.
873 : : * This also initializes the elements of instrumentation->ps_instrument,
874 : : * if it exists.
875 : : */
3821 876 : 384 : d.pcxt = pcxt;
877 : 384 : d.instrumentation = instrumentation;
878 : 384 : d.nnodes = 0;
879 : :
880 : : /* Install our DSA area while initializing the plan. */
3009 881 : 384 : estate->es_query_dsa = pei->area;
3821 882 : 384 : ExecParallelInitializeDSM(planstate, &d);
3009 883 : 384 : estate->es_query_dsa = NULL;
884 : :
885 : : /*
886 : : * Make sure that the world hasn't shifted under our feet. This could
887 : : * probably just be an Assert(), but let's be conservative for now.
888 : : */
3821 889 [ - + ]: 384 : if (e.nnodes != d.nnodes)
3821 rhaas@postgresql.org 890 [ # # ]:UBC 0 : elog(ERROR, "inconsistent count of PlanState nodes");
891 : :
892 : : /* OK, we're ready to rock and roll. */
3821 rhaas@postgresql.org 893 :CBC 384 : return pei;
894 : : }
895 : :
896 : : /*
897 : : * Set up tuple queue readers to read the results of a parallel subplan.
898 : : *
899 : : * This is separate from ExecInitParallelPlan() because we can launch the
900 : : * worker processes and let them start doing something before we do this.
901 : : */
902 : : void
3104 andres@anarazel.de 903 : 504 : ExecParallelCreateReaders(ParallelExecutorInfo *pei)
904 : : {
3117 tgl@sss.pgh.pa.us 905 : 504 : int nworkers = pei->pcxt->nworkers_launched;
906 : : int i;
907 : :
908 [ - + ]: 504 : Assert(pei->reader == NULL);
909 : :
910 [ + - ]: 504 : if (nworkers > 0)
911 : : {
912 : 504 : pei->reader = (TupleQueueReader **)
913 : 504 : palloc(nworkers * sizeof(TupleQueueReader *));
914 : :
915 [ + + ]: 1849 : for (i = 0; i < nworkers; i++)
916 : : {
917 : 1345 : shm_mq_set_handle(pei->tqueue[i],
918 : 1345 : pei->pcxt->worker[i].bgwhandle);
3104 andres@anarazel.de 919 : 1345 : pei->reader[i] = CreateTupleQueueReader(pei->tqueue[i]);
920 : : }
921 : : }
3117 tgl@sss.pgh.pa.us 922 : 504 : }
923 : :
924 : : /*
925 : : * Re-initialize the parallel executor shared memory state before launching
926 : : * a fresh batch of workers.
927 : : */
928 : : void
3119 929 : 129 : ExecParallelReinitialize(PlanState *planstate,
930 : : ParallelExecutorInfo *pei,
931 : : Bitmapset *sendParams)
932 : : {
3041 rhaas@postgresql.org 933 : 129 : EState *estate = planstate->state;
934 : : FixedParallelExecutorState *fpes;
935 : :
936 : : /* Old workers must already be shut down */
3119 tgl@sss.pgh.pa.us 937 [ - + ]: 129 : Assert(pei->finished);
938 : :
939 : : /*
940 : : * Force any initplan outputs that we're going to pass to workers to be
941 : : * evaluated, if they weren't already (see comments in
942 : : * ExecInitParallelPlan).
943 : : */
2738 944 [ + - ]: 129 : ExecSetParamPlanMulti(sendParams, GetPerTupleExprContext(estate));
945 : :
3119 946 : 129 : ReinitializeParallelDSM(pei->pcxt);
947 : 129 : pei->tqueue = ExecParallelSetupTupleQueues(pei->pcxt, true);
3117 948 : 129 : pei->reader = NULL;
3119 949 : 129 : pei->finished = false;
950 : :
3041 rhaas@postgresql.org 951 : 129 : fpes = shm_toc_lookup(pei->pcxt->toc, PARALLEL_KEY_EXECUTOR_FIXED, false);
952 : :
953 : : /* Free any serialized parameters from the last round. */
954 [ - + ]: 129 : if (DsaPointerIsValid(fpes->param_exec))
955 : : {
3009 rhaas@postgresql.org 956 :UBC 0 : dsa_free(pei->area, fpes->param_exec);
3041 957 : 0 : fpes->param_exec = InvalidDsaPointer;
958 : : }
959 : :
960 : : /* Serialize current parameter values if required. */
3041 rhaas@postgresql.org 961 [ - + ]:CBC 129 : if (!bms_is_empty(sendParams))
962 : : {
3009 rhaas@postgresql.org 963 :UBC 0 : pei->param_exec = SerializeParamExecParams(estate, sendParams,
964 : : pei->area);
3041 965 : 0 : fpes->param_exec = pei->param_exec;
966 : : }
967 : :
968 : : /* Traverse plan tree and let each child node reset associated state. */
3009 rhaas@postgresql.org 969 :CBC 129 : estate->es_query_dsa = pei->area;
3119 tgl@sss.pgh.pa.us 970 : 129 : ExecParallelReInitializeDSM(planstate, pei->pcxt);
3009 rhaas@postgresql.org 971 : 129 : estate->es_query_dsa = NULL;
3119 tgl@sss.pgh.pa.us 972 : 129 : }
973 : :
974 : : /*
975 : : * Traverse plan tree to reinitialize per-node dynamic shared memory state
976 : : */
977 : : static bool
978 : 333 : ExecParallelReInitializeDSM(PlanState *planstate,
979 : : ParallelContext *pcxt)
980 : : {
981 [ - + ]: 333 : if (planstate == NULL)
3119 tgl@sss.pgh.pa.us 982 :UBC 0 : return false;
983 : :
984 : : /*
985 : : * Call reinitializers for DSM-using plan nodes.
986 : : */
3119 tgl@sss.pgh.pa.us 987 [ + + + - :CBC 333 : switch (nodeTag(planstate))
- - - + +
+ + ]
988 : : {
989 : 138 : case T_SeqScanState:
990 [ + + ]: 138 : if (planstate->plan->parallel_aware)
991 : 114 : ExecSeqScanReInitializeDSM((SeqScanState *) planstate,
992 : : pcxt);
993 : 138 : break;
994 : 6 : case T_IndexScanState:
995 [ + - ]: 6 : if (planstate->plan->parallel_aware)
996 : 6 : ExecIndexScanReInitializeDSM((IndexScanState *) planstate,
997 : : pcxt);
998 : 6 : break;
999 : 6 : case T_IndexOnlyScanState:
1000 [ + - ]: 6 : if (planstate->plan->parallel_aware)
1001 : 6 : ExecIndexOnlyScanReInitializeDSM((IndexOnlyScanState *) planstate,
1002 : : pcxt);
1003 : 6 : break;
3119 tgl@sss.pgh.pa.us 1004 :UBC 0 : case T_ForeignScanState:
1005 [ # # ]: 0 : if (planstate->plan->parallel_aware)
1006 : 0 : ExecForeignScanReInitializeDSM((ForeignScanState *) planstate,
1007 : : pcxt);
1008 : 0 : break;
108 drowley@postgresql.o 1009 :UNC 0 : case T_TidRangeScanState:
1010 [ # # ]: 0 : if (planstate->plan->parallel_aware)
1011 : 0 : ExecTidRangeScanReInitializeDSM((TidRangeScanState *) planstate,
1012 : : pcxt);
1013 : 0 : break;
3022 rhaas@postgresql.org 1014 :UBC 0 : case T_AppendState:
1015 [ # # ]: 0 : if (planstate->plan->parallel_aware)
1016 : 0 : ExecAppendReInitializeDSM((AppendState *) planstate, pcxt);
1017 : 0 : break;
3119 tgl@sss.pgh.pa.us 1018 : 0 : case T_CustomScanState:
1019 [ # # ]: 0 : if (planstate->plan->parallel_aware)
1020 : 0 : ExecCustomScanReInitializeDSM((CustomScanState *) planstate,
1021 : : pcxt);
1022 : 0 : break;
3119 tgl@sss.pgh.pa.us 1023 :CBC 27 : case T_BitmapHeapScanState:
1024 [ + - ]: 27 : if (planstate->plan->parallel_aware)
1025 : 27 : ExecBitmapHeapReInitializeDSM((BitmapHeapScanState *) planstate,
1026 : : pcxt);
1027 : 27 : break;
3007 andres@anarazel.de 1028 : 48 : case T_HashJoinState:
1029 [ + + ]: 48 : if (planstate->plan->parallel_aware)
1030 : 24 : ExecHashJoinReInitializeDSM((HashJoinState *) planstate,
1031 : : pcxt);
1032 : 48 : break;
369 pg@bowt.ie 1033 : 90 : case T_BitmapIndexScanState:
1034 : : case T_HashState:
1035 : : case T_SortState:
1036 : : case T_IncrementalSortState:
1037 : : case T_MemoizeState:
1038 : : /* these nodes have DSM state, but no reinitialization is required */
3119 tgl@sss.pgh.pa.us 1039 : 90 : break;
1040 : :
1041 : 18 : default:
1042 : 18 : break;
1043 : : }
1044 : :
1045 : 333 : return planstate_tree_walker(planstate, ExecParallelReInitializeDSM, pcxt);
1046 : : }
1047 : :
1048 : : /*
1049 : : * Copy instrumentation information about this node and its descendants from
1050 : : * dynamic shared memory.
1051 : : */
1052 : : static bool
3821 rhaas@postgresql.org 1053 : 513 : ExecParallelRetrieveInstrumentation(PlanState *planstate,
1054 : : SharedExecutorInstrumentation *instrumentation)
1055 : : {
1056 : : Instrumentation *instrument;
1057 : : int i;
1058 : : int n;
1059 : : int ibytes;
3566 1060 : 513 : int plan_node_id = planstate->plan->plan_node_id;
1061 : : MemoryContext oldcontext;
1062 : :
1063 : : /* Find the instrumentation for this node. */
3749 1064 [ + - ]: 2319 : for (i = 0; i < instrumentation->num_plan_nodes; ++i)
1065 [ + + ]: 2319 : if (instrumentation->plan_node_id[i] == plan_node_id)
3821 1066 : 513 : break;
3749 1067 [ - + ]: 513 : if (i >= instrumentation->num_plan_nodes)
3821 rhaas@postgresql.org 1068 [ # # ]:UBC 0 : elog(ERROR, "plan node %d not found", plan_node_id);
1069 : :
1070 : : /* Accumulate the statistics from all workers. */
3749 rhaas@postgresql.org 1071 :CBC 513 : instrument = GetInstrumentationArray(instrumentation);
1072 : 513 : instrument += i * instrumentation->num_workers;
1073 [ + + ]: 1353 : for (n = 0; n < instrumentation->num_workers; ++n)
1074 : 840 : InstrAggNode(planstate->instrument, &instrument[n]);
1075 : :
1076 : : /*
1077 : : * Also store the per-worker detail.
1078 : : *
1079 : : * Worker instrumentation should be allocated in the same context as the
1080 : : * regular instrumentation information, which is the per-query context.
1081 : : * Switch into per-query memory context.
1082 : : */
3014 1083 : 513 : oldcontext = MemoryContextSwitchTo(planstate->state->es_query_cxt);
1084 : 513 : ibytes = mul_size(instrumentation->num_workers, sizeof(Instrumentation));
1085 : 513 : planstate->worker_instrument =
1086 : 513 : palloc(ibytes + offsetof(WorkerInstrumentation, instrument));
1087 : 513 : MemoryContextSwitchTo(oldcontext);
1088 : :
3749 1089 : 513 : planstate->worker_instrument->num_workers = instrumentation->num_workers;
3014 1090 : 513 : memcpy(&planstate->worker_instrument->instrument, instrument, ibytes);
1091 : :
1092 : : /* Perform any node-type-specific work that needs to be done. */
3022 andres@anarazel.de 1093 [ + - - + : 513 : switch (nodeTag(planstate))
- + + - -
+ ]
1094 : : {
369 pg@bowt.ie 1095 : 135 : case T_IndexScanState:
1096 : 135 : ExecIndexScanRetrieveInstrumentation((IndexScanState *) planstate);
1097 : 135 : break;
369 pg@bowt.ie 1098 :UBC 0 : case T_IndexOnlyScanState:
1099 : 0 : ExecIndexOnlyScanRetrieveInstrumentation((IndexOnlyScanState *) planstate);
1100 : 0 : break;
1101 : 0 : case T_BitmapIndexScanState:
1102 : 0 : ExecBitmapIndexScanRetrieveInstrumentation((BitmapIndexScanState *) planstate);
1103 : 0 : break;
3022 andres@anarazel.de 1104 :CBC 6 : case T_SortState:
1105 : 6 : ExecSortRetrieveInstrumentation((SortState *) planstate);
1106 : 6 : break;
2169 tomas.vondra@postgre 1107 :UBC 0 : case T_IncrementalSortState:
1108 : 0 : ExecIncrementalSortRetrieveInstrumentation((IncrementalSortState *) planstate);
1109 : 0 : break;
3022 andres@anarazel.de 1110 :CBC 42 : case T_HashState:
1111 : 42 : ExecHashRetrieveInstrumentation((HashState *) planstate);
1112 : 42 : break;
2095 drowley@postgresql.o 1113 : 51 : case T_AggState:
1114 : 51 : ExecAggRetrieveInstrumentation((AggState *) planstate);
1115 : 51 : break;
1705 drowley@postgresql.o 1116 :UBC 0 : case T_MemoizeState:
1117 : 0 : ExecMemoizeRetrieveInstrumentation((MemoizeState *) planstate);
1808 1118 : 0 : break;
614 1119 : 0 : case T_BitmapHeapScanState:
1120 : 0 : ExecBitmapHeapRetrieveInstrumentation((BitmapHeapScanState *) planstate);
1121 : 0 : break;
3022 andres@anarazel.de 1122 :CBC 279 : default:
1123 : 279 : break;
1124 : : }
1125 : :
3821 rhaas@postgresql.org 1126 : 513 : return planstate_tree_walker(planstate, ExecParallelRetrieveInstrumentation,
1127 : : instrumentation);
1128 : : }
1129 : :
1130 : : /*
1131 : : * Add up the workers' JIT instrumentation from dynamic shared memory.
1132 : : */
1133 : : static void
2728 andres@anarazel.de 1134 :LBC (12) : ExecParallelRetrieveJitInstrumentation(PlanState *planstate,
1135 : : SharedJitInstrumentation *shared_jit)
1136 : : {
1137 : : JitInstrumentation *combined;
1138 : : int ibytes;
1139 : :
1140 : : int n;
1141 : :
1142 : : /*
1143 : : * Accumulate worker JIT instrumentation into the combined JIT
1144 : : * instrumentation, allocating it if required.
1145 : : */
2720 1146 [ # # ]: (12) : if (!planstate->state->es_jit_worker_instr)
1147 : (12) : planstate->state->es_jit_worker_instr =
2728 1148 : (12) : MemoryContextAllocZero(planstate->state->es_query_cxt, sizeof(JitInstrumentation));
2720 1149 : (12) : combined = planstate->state->es_jit_worker_instr;
1150 : :
1151 : : /* Accumulate all the workers' instrumentations. */
2728 1152 [ # # ]: (36) : for (n = 0; n < shared_jit->num_workers; ++n)
1153 : (24) : InstrJitAgg(combined, &shared_jit->jit_instr[n]);
1154 : :
1155 : : /*
1156 : : * Store the per-worker detail.
1157 : : *
1158 : : * Similar to ExecParallelRetrieveInstrumentation(), allocate the
1159 : : * instrumentation in per-query context.
1160 : : */
1161 : (12) : ibytes = offsetof(SharedJitInstrumentation, jit_instr)
2489 tgl@sss.pgh.pa.us 1162 : (12) : + mul_size(shared_jit->num_workers, sizeof(JitInstrumentation));
2728 andres@anarazel.de 1163 : (12) : planstate->worker_jit_instrument =
1164 : (12) : MemoryContextAlloc(planstate->state->es_query_cxt, ibytes);
1165 : :
1166 : (12) : memcpy(planstate->worker_jit_instrument, shared_jit, ibytes);
1167 : (12) : }
1168 : :
1169 : : /*
1170 : : * Finish parallel execution. We wait for parallel workers to finish, and
1171 : : * accumulate their buffer/WAL usage.
1172 : : */
1173 : : void
3821 rhaas@postgresql.org 1174 :CBC 927 : ExecParallelFinish(ParallelExecutorInfo *pei)
1175 : : {
3117 tgl@sss.pgh.pa.us 1176 : 927 : int nworkers = pei->pcxt->nworkers_launched;
1177 : : int i;
1178 : :
1179 : : /* Make this be a no-op if called twice in a row. */
3770 rhaas@postgresql.org 1180 [ + + ]: 927 : if (pei->finished)
1181 : 420 : return;
1182 : :
1183 : : /*
1184 : : * Detach from tuple queues ASAP, so that any still-active workers will
1185 : : * notice that no further results are wanted.
1186 : : */
3117 tgl@sss.pgh.pa.us 1187 [ + - ]: 507 : if (pei->tqueue != NULL)
1188 : : {
1189 [ + + ]: 1846 : for (i = 0; i < nworkers; i++)
1190 : 1339 : shm_mq_detach(pei->tqueue[i]);
1191 : 507 : pfree(pei->tqueue);
1192 : 507 : pei->tqueue = NULL;
1193 : : }
1194 : :
1195 : : /*
1196 : : * While we're waiting for the workers to finish, let's get rid of the
1197 : : * tuple queue readers. (Any other local cleanup could be done here too.)
1198 : : */
1199 [ + + ]: 507 : if (pei->reader != NULL)
1200 : : {
1201 [ + + ]: 1837 : for (i = 0; i < nworkers; i++)
1202 : 1339 : DestroyTupleQueueReader(pei->reader[i]);
1203 : 498 : pfree(pei->reader);
1204 : 498 : pei->reader = NULL;
1205 : : }
1206 : :
1207 : : /* Now wait for the workers to finish. */
3821 rhaas@postgresql.org 1208 : 507 : WaitForParallelWorkersToFinish(pei->pcxt);
1209 : :
1210 : : /*
1211 : : * Next, accumulate buffer/WAL usage. (This must wait for the workers to
1212 : : * finish, or we might get incomplete data.)
1213 : : */
3117 tgl@sss.pgh.pa.us 1214 [ + + ]: 1846 : for (i = 0; i < nworkers; i++)
2171 akapila@postgresql.o 1215 : 1339 : InstrAccumParallelQuery(&pei->buffer_usage[i], &pei->wal_usage[i]);
1216 : :
3770 rhaas@postgresql.org 1217 : 507 : pei->finished = true;
1218 : : }
1219 : :
1220 : : /*
1221 : : * Accumulate instrumentation, and then clean up whatever ParallelExecutorInfo
1222 : : * resources still exist after ExecParallelFinish. We separate these
1223 : : * routines because someone might want to examine the contents of the DSM
1224 : : * after ExecParallelFinish and before calling this routine.
1225 : : */
1226 : : void
3803 1227 : 378 : ExecParallelCleanup(ParallelExecutorInfo *pei)
1228 : : {
1229 : : /* Accumulate instrumentation, if any. */
3008 1230 [ + + ]: 378 : if (pei->instrumentation)
1231 : 90 : ExecParallelRetrieveInstrumentation(pei->planstate,
1232 : : pei->instrumentation);
1233 : :
1234 : : /* Accumulate JIT instrumentation, if any. */
2728 andres@anarazel.de 1235 [ - + ]: 378 : if (pei->jit_instrumentation)
2728 andres@anarazel.de 1236 :LBC (12) : ExecParallelRetrieveJitInstrumentation(pei->planstate,
2489 tgl@sss.pgh.pa.us 1237 : (12) : pei->jit_instrumentation);
1238 : :
1239 : : /* Free any serialized parameters. */
3041 rhaas@postgresql.org 1240 [ + + ]:CBC 378 : if (DsaPointerIsValid(pei->param_exec))
1241 : : {
1242 : 12 : dsa_free(pei->area, pei->param_exec);
1243 : 12 : pei->param_exec = InvalidDsaPointer;
1244 : : }
3373 1245 [ + - ]: 378 : if (pei->area != NULL)
1246 : : {
1247 : 378 : dsa_detach(pei->area);
1248 : 378 : pei->area = NULL;
1249 : : }
3803 1250 [ + - ]: 378 : if (pei->pcxt != NULL)
1251 : : {
1252 : 378 : DestroyParallelContext(pei->pcxt);
1253 : 378 : pei->pcxt = NULL;
1254 : : }
1255 : 378 : pfree(pei);
1256 : 378 : }
1257 : :
1258 : : /*
1259 : : * Create a DestReceiver to write tuples we produce to the shm_mq designated
1260 : : * for that purpose.
1261 : : */
1262 : : static DestReceiver *
3821 1263 : 1345 : ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc)
1264 : : {
1265 : : char *mqspace;
1266 : : shm_mq *mq;
1267 : :
3205 tgl@sss.pgh.pa.us 1268 : 1345 : mqspace = shm_toc_lookup(toc, PARALLEL_KEY_TUPLE_QUEUE, false);
3821 rhaas@postgresql.org 1269 : 1345 : mqspace += ParallelWorkerNumber * PARALLEL_TUPLE_QUEUE_SIZE;
1270 : 1345 : mq = (shm_mq *) mqspace;
1271 : 1345 : shm_mq_set_sender(mq, MyProc);
1272 : 1345 : return CreateTupleQueueDestReceiver(shm_mq_attach(mq, seg, NULL));
1273 : : }
1274 : :
1275 : : /*
1276 : : * Create a QueryDesc for the PlannedStmt we are to execute, and return it.
1277 : : */
1278 : : static QueryDesc *
1279 : 1345 : ExecParallelGetQueryDesc(shm_toc *toc, DestReceiver *receiver,
1280 : : int instrument_options)
1281 : : {
1282 : : char *pstmtspace;
1283 : : char *paramspace;
1284 : : PlannedStmt *pstmt;
1285 : : ParamListInfo paramLI;
1286 : : char *queryString;
1287 : :
1288 : : /* Get the query string from shared memory */
3205 tgl@sss.pgh.pa.us 1289 : 1345 : queryString = shm_toc_lookup(toc, PARALLEL_KEY_QUERY_TEXT, false);
1290 : :
1291 : : /* Reconstruct leader-supplied PlannedStmt. */
1292 : 1345 : pstmtspace = shm_toc_lookup(toc, PARALLEL_KEY_PLANNEDSTMT, false);
3821 rhaas@postgresql.org 1293 : 1345 : pstmt = (PlannedStmt *) stringToNode(pstmtspace);
1294 : :
1295 : : /* Reconstruct ParamListInfo. */
3041 1296 : 1345 : paramspace = shm_toc_lookup(toc, PARALLEL_KEY_PARAMLISTINFO, false);
3821 1297 : 1345 : paramLI = RestoreParamList(¶mspace);
1298 : :
1299 : : /* Create a QueryDesc for the query. */
1300 : 1345 : return CreateQueryDesc(pstmt,
1301 : : queryString,
1302 : : GetActiveSnapshot(), InvalidSnapshot,
1303 : : receiver, paramLI, NULL, instrument_options);
1304 : : }
1305 : :
1306 : : /*
1307 : : * Copy instrumentation information from this node and its descendants into
1308 : : * dynamic shared memory, so that the parallel leader can retrieve it.
1309 : : */
1310 : : static bool
1311 : 1188 : ExecParallelReportInstrumentation(PlanState *planstate,
1312 : : SharedExecutorInstrumentation *instrumentation)
1313 : : {
1314 : : int i;
3566 1315 : 1188 : int plan_node_id = planstate->plan->plan_node_id;
1316 : : Instrumentation *instrument;
1317 : :
3749 1318 : 1188 : InstrEndLoop(planstate->instrument);
1319 : :
1320 : : /*
1321 : : * If we shuffled the plan_node_id values in ps_instrument into sorted
1322 : : * order, we could use binary search here. This might matter someday if
1323 : : * we're pushing down sufficiently large plan trees. For now, do it the
1324 : : * slow, dumb way.
1325 : : */
1326 [ + - ]: 3906 : for (i = 0; i < instrumentation->num_plan_nodes; ++i)
1327 [ + + ]: 3906 : if (instrumentation->plan_node_id[i] == plan_node_id)
3821 1328 : 1188 : break;
3749 1329 [ - + ]: 1188 : if (i >= instrumentation->num_plan_nodes)
3821 rhaas@postgresql.org 1330 [ # # ]:UBC 0 : elog(ERROR, "plan node %d not found", plan_node_id);
1331 : :
1332 : : /*
1333 : : * Add our statistics to the per-node, per-worker totals. It's possible
1334 : : * that this could happen more than once if we relaunched workers.
1335 : : */
3749 rhaas@postgresql.org 1336 :CBC 1188 : instrument = GetInstrumentationArray(instrumentation);
1337 : 1188 : instrument += i * instrumentation->num_workers;
1338 [ - + ]: 1188 : Assert(IsParallelWorker());
1339 [ - + ]: 1188 : Assert(ParallelWorkerNumber < instrumentation->num_workers);
1340 : 1188 : InstrAggNode(&instrument[ParallelWorkerNumber], planstate->instrument);
1341 : :
3821 1342 : 1188 : return planstate_tree_walker(planstate, ExecParallelReportInstrumentation,
1343 : : instrumentation);
1344 : : }
1345 : :
1346 : : /*
1347 : : * Initialize the PlanState and its descendants with the information
1348 : : * retrieved from shared memory. This has to be done once the PlanState
1349 : : * is allocated and initialized by executor; that is, after ExecutorStart().
1350 : : */
1351 : : static bool
3041 andres@anarazel.de 1352 : 4264 : ExecParallelInitializeWorker(PlanState *planstate, ParallelWorkerContext *pwcxt)
1353 : : {
3777 rhaas@postgresql.org 1354 [ - + ]: 4264 : if (planstate == NULL)
3777 rhaas@postgresql.org 1355 :UBC 0 : return false;
1356 : :
3120 rhaas@postgresql.org 1357 [ + + + + :CBC 4264 : switch (nodeTag(planstate))
- + + - +
+ + + - +
+ + ]
1358 : : {
1359 : 1697 : case T_SeqScanState:
1360 [ + + ]: 1697 : if (planstate->plan->parallel_aware)
3041 andres@anarazel.de 1361 : 1383 : ExecSeqScanInitializeWorker((SeqScanState *) planstate, pwcxt);
3120 rhaas@postgresql.org 1362 : 1697 : break;
1363 : 198 : case T_IndexScanState:
1364 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
369 pg@bowt.ie 1365 : 198 : ExecIndexScanInitializeWorker((IndexScanState *) planstate, pwcxt);
3120 rhaas@postgresql.org 1366 : 198 : break;
1367 : 121 : case T_IndexOnlyScanState:
1368 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
369 pg@bowt.ie 1369 : 121 : ExecIndexOnlyScanInitializeWorker((IndexOnlyScanState *) planstate,
1370 : : pwcxt);
1371 : 121 : break;
1372 : 136 : case T_BitmapIndexScanState:
1373 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
1374 : 136 : ExecBitmapIndexScanInitializeWorker((BitmapIndexScanState *) planstate,
1375 : : pwcxt);
3120 rhaas@postgresql.org 1376 : 136 : break;
3120 rhaas@postgresql.org 1377 :UBC 0 : case T_ForeignScanState:
1378 [ # # ]: 0 : if (planstate->plan->parallel_aware)
3693 1379 : 0 : ExecForeignScanInitializeWorker((ForeignScanState *) planstate,
1380 : : pwcxt);
3120 1381 : 0 : break;
108 drowley@postgresql.o 1382 :GNC 48 : case T_TidRangeScanState:
1383 [ + - ]: 48 : if (planstate->plan->parallel_aware)
1384 : 48 : ExecTidRangeScanInitializeWorker((TidRangeScanState *) planstate,
1385 : : pwcxt);
1386 : 48 : break;
3022 rhaas@postgresql.org 1387 :CBC 189 : case T_AppendState:
1388 [ + + ]: 189 : if (planstate->plan->parallel_aware)
1389 : 159 : ExecAppendInitializeWorker((AppendState *) planstate, pwcxt);
1390 : 189 : break;
3120 rhaas@postgresql.org 1391 :UBC 0 : case T_CustomScanState:
1392 [ # # ]: 0 : if (planstate->plan->parallel_aware)
3693 1393 : 0 : ExecCustomScanInitializeWorker((CustomScanState *) planstate,
1394 : : pwcxt);
3120 1395 : 0 : break;
3120 rhaas@postgresql.org 1396 :CBC 136 : case T_BitmapHeapScanState:
1397 [ + + ]: 136 : if (planstate->plan->parallel_aware)
3041 andres@anarazel.de 1398 : 135 : ExecBitmapHeapInitializeWorker((BitmapHeapScanState *) planstate,
1399 : : pwcxt);
3120 rhaas@postgresql.org 1400 : 136 : break;
3007 andres@anarazel.de 1401 : 279 : case T_HashJoinState:
1402 [ + + ]: 279 : if (planstate->plan->parallel_aware)
1403 : 159 : ExecHashJoinInitializeWorker((HashJoinState *) planstate,
1404 : : pwcxt);
1405 : 279 : break;
3022 1406 : 279 : case T_HashState:
1407 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
1408 : 279 : ExecHashInitializeWorker((HashState *) planstate, pwcxt);
1409 : 279 : break;
3120 rhaas@postgresql.org 1410 : 232 : case T_SortState:
1411 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
3041 andres@anarazel.de 1412 : 232 : ExecSortInitializeWorker((SortState *) planstate, pwcxt);
3119 tgl@sss.pgh.pa.us 1413 : 232 : break;
2169 tomas.vondra@postgre 1414 :UBC 0 : case T_IncrementalSortState:
1415 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
1416 : 0 : ExecIncrementalSortInitializeWorker((IncrementalSortState *) planstate,
1417 : : pwcxt);
1418 : 0 : break;
2095 drowley@postgresql.o 1419 :CBC 830 : case T_AggState:
1420 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
1421 : 830 : ExecAggInitializeWorker((AggState *) planstate, pwcxt);
1422 : 830 : break;
1705 1423 : 6 : case T_MemoizeState:
1424 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
1425 : 6 : ExecMemoizeInitializeWorker((MemoizeState *) planstate, pwcxt);
1808 1426 : 6 : break;
3120 rhaas@postgresql.org 1427 : 113 : default:
1428 : 113 : break;
1429 : : }
1430 : :
3041 andres@anarazel.de 1431 : 4264 : return planstate_tree_walker(planstate, ExecParallelInitializeWorker,
1432 : : pwcxt);
1433 : : }
1434 : :
1435 : : /*
1436 : : * Main entrypoint for parallel query worker processes.
1437 : : *
1438 : : * We reach this function from ParallelWorkerMain, so the setup necessary to
1439 : : * create a sensible parallel environment has already been done;
1440 : : * ParallelWorkerMain worries about stuff like the transaction state, combo
1441 : : * CID mappings, and GUC values, so we don't need to deal with any of that
1442 : : * here.
1443 : : *
1444 : : * Our job is to deal with concerns specific to the executor. The parallel
1445 : : * group leader will have stored a serialized PlannedStmt, and it's our job
1446 : : * to execute that plan and write the resulting tuples to the appropriate
1447 : : * tuple queue. Various bits of supporting information that we need in order
1448 : : * to do this are also stored in the dsm_segment and can be accessed through
1449 : : * the shm_toc.
1450 : : */
1451 : : void
3821 rhaas@postgresql.org 1452 : 1345 : ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
1453 : : {
1454 : : FixedParallelExecutorState *fpes;
1455 : : BufferUsage *buffer_usage;
1456 : : WalUsage *wal_usage;
1457 : : DestReceiver *receiver;
1458 : : QueryDesc *queryDesc;
1459 : : SharedExecutorInstrumentation *instrumentation;
1460 : : SharedJitInstrumentation *jit_instrumentation;
1461 : 1345 : int instrument_options = 0;
1462 : : void *area_space;
1463 : : dsa_area *area;
1464 : : ParallelWorkerContext pwcxt;
1465 : :
1466 : : /* Get fixed-size state. */
3120 1467 : 1345 : fpes = shm_toc_lookup(toc, PARALLEL_KEY_EXECUTOR_FIXED, false);
1468 : :
1469 : : /* Set up DestReceiver, SharedExecutorInstrumentation, and QueryDesc. */
3821 1470 : 1345 : receiver = ExecParallelGetReceiver(seg, toc);
3205 tgl@sss.pgh.pa.us 1471 : 1345 : instrumentation = shm_toc_lookup(toc, PARALLEL_KEY_INSTRUMENTATION, true);
3821 rhaas@postgresql.org 1472 [ + + ]: 1345 : if (instrumentation != NULL)
1473 : 363 : instrument_options = instrumentation->instrument_options;
2728 andres@anarazel.de 1474 : 1345 : jit_instrumentation = shm_toc_lookup(toc, PARALLEL_KEY_JIT_INSTRUMENTATION,
1475 : : true);
3821 rhaas@postgresql.org 1476 : 1345 : queryDesc = ExecParallelGetQueryDesc(toc, receiver, instrument_options);
1477 : :
1478 : : /* Setting debug_query_string for individual workers */
3308 1479 : 1345 : debug_query_string = queryDesc->sourceText;
1480 : :
1481 : : /* Report workers' query for monitoring purposes */
1482 : 1345 : pgstat_report_activity(STATE_RUNNING, debug_query_string);
1483 : :
1484 : : /* Attach to the dynamic shared memory area. */
3205 tgl@sss.pgh.pa.us 1485 : 1345 : area_space = shm_toc_lookup(toc, PARALLEL_KEY_DSA, false);
3373 rhaas@postgresql.org 1486 : 1345 : area = dsa_attach_in_place(area_space, seg);
1487 : :
1488 : : /* Start up the executor */
2915 andres@anarazel.de 1489 : 1345 : queryDesc->plannedstmt->jitFlags = fpes->jit_flags;
297 amitlan@postgresql.o 1490 : 1345 : ExecutorStart(queryDesc, fpes->eflags);
1491 : :
1492 : : /* Special executor initialization steps for parallel workers */
3373 rhaas@postgresql.org 1493 : 1345 : queryDesc->planstate->state->es_query_dsa = area;
3041 1494 [ + + ]: 1345 : if (DsaPointerIsValid(fpes->param_exec))
1495 : : {
1496 : : char *paramexec_space;
1497 : :
1498 : 36 : paramexec_space = dsa_get_address(area, fpes->param_exec);
1499 : 36 : RestoreParamExecParams(paramexec_space, queryDesc->estate);
1500 : : }
andres@anarazel.de 1501 : 1345 : pwcxt.toc = toc;
1502 : 1345 : pwcxt.seg = seg;
1503 : 1345 : ExecParallelInitializeWorker(queryDesc->planstate, &pwcxt);
1504 : :
1505 : : /* Pass down any tuple bound */
3120 rhaas@postgresql.org 1506 : 1345 : ExecSetTupleBound(fpes->tuples_needed, queryDesc->planstate);
1507 : :
1508 : : /*
1509 : : * Prepare to track buffer/WAL usage during query execution.
1510 : : *
1511 : : * We do this after starting up the executor to match what happens in the
1512 : : * leader, which also doesn't count buffer accesses and WAL activity that
1513 : : * occur during executor startup.
1514 : : */
2781 akapila@postgresql.o 1515 : 1345 : InstrStartParallelQuery();
1516 : :
1517 : : /*
1518 : : * Run the plan. If we specified a tuple bound, be careful not to demand
1519 : : * more tuples than that.
1520 : : */
3120 rhaas@postgresql.org 1521 : 1345 : ExecutorRun(queryDesc,
1522 : : ForwardScanDirection,
461 tgl@sss.pgh.pa.us 1523 : 1345 : fpes->tuples_needed < 0 ? (int64) 0 : fpes->tuples_needed);
1524 : :
1525 : : /* Shut down the executor */
3821 rhaas@postgresql.org 1526 : 1339 : ExecutorFinish(queryDesc);
1527 : :
1528 : : /* Report buffer/WAL usage during parallel execution. */
3205 tgl@sss.pgh.pa.us 1529 : 1339 : buffer_usage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE, false);
2171 akapila@postgresql.o 1530 : 1339 : wal_usage = shm_toc_lookup(toc, PARALLEL_KEY_WAL_USAGE, false);
1531 : 1339 : InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber],
1532 : 1339 : &wal_usage[ParallelWorkerNumber]);
1533 : :
1534 : : /* Report instrumentation data if any instrumentation options are set. */
3821 rhaas@postgresql.org 1535 [ + + ]: 1339 : if (instrumentation != NULL)
1536 : 363 : ExecParallelReportInstrumentation(queryDesc->planstate,
1537 : : instrumentation);
1538 : :
1539 : : /* Report JIT instrumentation data if any */
2728 andres@anarazel.de 1540 [ - + - - ]: 1339 : if (queryDesc->estate->es_jit && jit_instrumentation != NULL)
1541 : : {
2728 andres@anarazel.de 1542 [ # # ]:UBC 0 : Assert(ParallelWorkerNumber < jit_instrumentation->num_workers);
1543 : 0 : jit_instrumentation->jit_instr[ParallelWorkerNumber] =
1544 : 0 : queryDesc->estate->es_jit->instr;
1545 : : }
1546 : :
1547 : : /* Must do this after capturing instrumentation. */
3819 rhaas@postgresql.org 1548 :CBC 1339 : ExecutorEnd(queryDesc);
1549 : :
1550 : : /* Cleanup. */
3373 1551 : 1339 : dsa_detach(area);
3821 1552 : 1339 : FreeQueryDesc(queryDesc);
3111 peter_e@gmx.net 1553 : 1339 : receiver->rDestroy(receiver);
3821 rhaas@postgresql.org 1554 : 1339 : }
|