Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * execParallel.c
4 : : * Support routines for parallel execution.
5 : : *
6 : : * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
7 : : * Portions Copyright (c) 1994, Regents of the University of California
8 : : *
9 : : * This file contains routines that are intended to support setting up,
10 : : * using, and tearing down a ParallelContext from within the PostgreSQL
11 : : * executor. The ParallelContext machinery will handle starting the
12 : : * workers and ensuring that their state generally matches that of the
13 : : * leader; see src/backend/access/transam/README.parallel for details.
14 : : * However, we must save and restore relevant executor state, such as
15 : : * any ParamListInfo associated with the query, buffer/WAL usage info, and
16 : : * the actual plan to be passed down to the worker.
17 : : *
18 : : * IDENTIFICATION
19 : : * src/backend/executor/execParallel.c
20 : : *
21 : : *-------------------------------------------------------------------------
22 : : */
23 : :
24 : : #include "postgres.h"
25 : :
26 : : #include "executor/execParallel.h"
27 : : #include "executor/executor.h"
28 : : #include "executor/nodeAgg.h"
29 : : #include "executor/nodeAppend.h"
30 : : #include "executor/nodeBitmapHeapscan.h"
31 : : #include "executor/nodeBitmapIndexscan.h"
32 : : #include "executor/nodeCustom.h"
33 : : #include "executor/nodeForeignscan.h"
34 : : #include "executor/nodeHash.h"
35 : : #include "executor/nodeHashjoin.h"
36 : : #include "executor/nodeIncrementalSort.h"
37 : : #include "executor/nodeIndexonlyscan.h"
38 : : #include "executor/nodeIndexscan.h"
39 : : #include "executor/nodeMemoize.h"
40 : : #include "executor/nodeSeqscan.h"
41 : : #include "executor/nodeSort.h"
42 : : #include "executor/nodeSubplan.h"
43 : : #include "executor/nodeTidrangescan.h"
44 : : #include "executor/tqueue.h"
45 : : #include "jit/jit.h"
46 : : #include "nodes/nodeFuncs.h"
47 : : #include "pgstat.h"
48 : : #include "tcop/tcopprot.h"
49 : : #include "utils/datum.h"
50 : : #include "utils/dsa.h"
51 : : #include "utils/lsyscache.h"
52 : : #include "utils/snapmgr.h"
53 : :
54 : : /*
55 : : * Magic numbers for parallel executor communication. We use constants
56 : : * greater than any 32-bit integer here so that values < 2^32 can be used
57 : : * by individual parallel nodes to store their own state.
58 : : */
59 : : #define PARALLEL_KEY_EXECUTOR_FIXED UINT64CONST(0xE000000000000001)
60 : : #define PARALLEL_KEY_PLANNEDSTMT UINT64CONST(0xE000000000000002)
61 : : #define PARALLEL_KEY_PARAMLISTINFO UINT64CONST(0xE000000000000003)
62 : : #define PARALLEL_KEY_BUFFER_USAGE UINT64CONST(0xE000000000000004)
63 : : #define PARALLEL_KEY_TUPLE_QUEUE UINT64CONST(0xE000000000000005)
64 : : #define PARALLEL_KEY_INSTRUMENTATION UINT64CONST(0xE000000000000006)
65 : : #define PARALLEL_KEY_DSA UINT64CONST(0xE000000000000007)
66 : : #define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xE000000000000008)
67 : : #define PARALLEL_KEY_JIT_INSTRUMENTATION UINT64CONST(0xE000000000000009)
68 : : #define PARALLEL_KEY_WAL_USAGE UINT64CONST(0xE00000000000000A)
69 : :
70 : : #define PARALLEL_TUPLE_QUEUE_SIZE 65536
71 : :
72 : : /*
73 : : * Fixed-size random stuff that we need to pass to parallel workers.
74 : : */
75 : : typedef struct FixedParallelExecutorState
76 : : {
77 : : int64 tuples_needed; /* tuple bound, see ExecSetTupleBound */
78 : : dsa_pointer param_exec;
79 : : int eflags;
80 : : int jit_flags;
81 : : } FixedParallelExecutorState;
82 : :
83 : : /*
84 : : * DSM structure for accumulating per-PlanState instrumentation.
85 : : *
86 : : * instrument_options: Same meaning here as in instrument.c.
87 : : *
88 : : * instrument_offset: Offset, relative to the start of this structure,
89 : : * of the first Instrumentation object. This will depend on the length of
90 : : * the plan_node_id array.
91 : : *
92 : : * num_workers: Number of workers.
93 : : *
94 : : * num_plan_nodes: Number of plan nodes.
95 : : *
96 : : * plan_node_id: Array of plan nodes for which we are gathering instrumentation
97 : : * from parallel workers. The length of this array is given by num_plan_nodes.
98 : : */
99 : : struct SharedExecutorInstrumentation
100 : : {
101 : : int instrument_options;
102 : : int instrument_offset;
103 : : int num_workers;
104 : : int num_plan_nodes;
105 : : int plan_node_id[FLEXIBLE_ARRAY_MEMBER];
106 : : /* array of num_plan_nodes * num_workers Instrumentation objects follows */
107 : : };
108 : : #define GetInstrumentationArray(sei) \
109 : : (AssertVariableIsOfTypeMacro(sei, SharedExecutorInstrumentation *), \
110 : : (Instrumentation *) (((char *) sei) + sei->instrument_offset))
111 : :
112 : : /* Context object for ExecParallelEstimate. */
113 : : typedef struct ExecParallelEstimateContext
114 : : {
115 : : ParallelContext *pcxt;
116 : : int nnodes;
117 : : } ExecParallelEstimateContext;
118 : :
119 : : /* Context object for ExecParallelInitializeDSM. */
120 : : typedef struct ExecParallelInitializeDSMContext
121 : : {
122 : : ParallelContext *pcxt;
123 : : SharedExecutorInstrumentation *instrumentation;
124 : : int nnodes;
125 : : } ExecParallelInitializeDSMContext;
126 : :
127 : : /* Helper functions that run in the parallel leader. */
128 : : static char *ExecSerializePlan(Plan *plan, EState *estate);
129 : : static bool ExecParallelEstimate(PlanState *planstate,
130 : : ExecParallelEstimateContext *e);
131 : : static bool ExecParallelInitializeDSM(PlanState *planstate,
132 : : ExecParallelInitializeDSMContext *d);
133 : : static shm_mq_handle **ExecParallelSetupTupleQueues(ParallelContext *pcxt,
134 : : bool reinitialize);
135 : : static bool ExecParallelReInitializeDSM(PlanState *planstate,
136 : : ParallelContext *pcxt);
137 : : static bool ExecParallelRetrieveInstrumentation(PlanState *planstate,
138 : : SharedExecutorInstrumentation *instrumentation);
139 : :
140 : : /* Helper function that runs in the parallel worker. */
141 : : static DestReceiver *ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc);
142 : :
143 : : /*
144 : : * Create a serialized representation of the plan to be sent to each worker.
145 : : */
146 : : static char *
3730 rhaas@postgresql.org 147 :CBC 378 : ExecSerializePlan(Plan *plan, EState *estate)
148 : : {
149 : : PlannedStmt *pstmt;
150 : : ListCell *lc;
151 : :
152 : : /* We can't scribble on the original plan, so make a copy. */
3732 153 : 378 : plan = copyObject(plan);
154 : :
155 : : /*
156 : : * The worker will start its own copy of the executor, and that copy will
157 : : * insert a junk filter if the toplevel node has any resjunk entries. We
158 : : * don't want that to happen, because while resjunk columns shouldn't be
159 : : * sent back to the user, here the tuples are coming back to another
160 : : * backend which may very well need them. So mutate the target list
161 : : * accordingly. This is sort of a hack; there might be better ways to do
162 : : * this...
163 : : */
3170 tgl@sss.pgh.pa.us 164 [ + + + + : 1033 : foreach(lc, plan->targetlist)
+ + ]
165 : : {
166 : 655 : TargetEntry *tle = lfirst_node(TargetEntry, lc);
167 : :
3732 rhaas@postgresql.org 168 : 655 : tle->resjunk = false;
169 : : }
170 : :
171 : : /*
172 : : * Create a dummy PlannedStmt. Most of the fields don't need to be valid
173 : : * for our purposes, but the worker will need at least a minimal
174 : : * PlannedStmt to start the executor.
175 : : */
176 : 378 : pstmt = makeNode(PlannedStmt);
177 : 378 : pstmt->commandType = CMD_SELECT;
1701 bruce@momjian.us 178 : 378 : pstmt->queryId = pgstat_get_my_query_id();
267 michael@paquier.xyz 179 : 378 : pstmt->planId = pgstat_get_my_plan_id();
3441 tgl@sss.pgh.pa.us 180 : 378 : pstmt->hasReturning = false;
181 : 378 : pstmt->hasModifyingCTE = false;
182 : 378 : pstmt->canSetTag = true;
183 : 378 : pstmt->transientPlan = false;
184 : 378 : pstmt->dependsOnRole = false;
185 : 378 : pstmt->parallelModeNeeded = false;
3732 rhaas@postgresql.org 186 : 378 : pstmt->planTree = plan;
320 amitlan@postgresql.o 187 : 378 : pstmt->partPruneInfos = estate->es_part_prune_infos;
3730 rhaas@postgresql.org 188 : 378 : pstmt->rtable = estate->es_range_table;
312 amitlan@postgresql.o 189 : 378 : pstmt->unprunableRelids = estate->es_unpruned_relids;
1106 alvherre@alvh.no-ip. 190 : 378 : pstmt->permInfos = estate->es_rteperminfos;
3732 rhaas@postgresql.org 191 : 378 : pstmt->resultRelations = NIL;
2197 tgl@sss.pgh.pa.us 192 : 378 : pstmt->appendRelations = NIL;
138 michael@paquier.xyz 193 :GNC 378 : pstmt->planOrigin = PLAN_STMT_INTERNAL;
194 : :
195 : : /*
196 : : * Transfer only parallel-safe subplans, leaving a NULL "hole" in the list
197 : : * for unsafe ones (so that the list indexes of the safe ones are
198 : : * preserved). This positively ensures that the worker won't try to run,
199 : : * or even do ExecInitNode on, an unsafe subplan. That's important to
200 : : * protect, eg, non-parallel-aware FDWs from getting into trouble.
201 : : */
3170 tgl@sss.pgh.pa.us 202 :CBC 378 : pstmt->subplans = NIL;
203 [ + + + + : 405 : foreach(lc, estate->es_plannedstmt->subplans)
+ + ]
204 : : {
205 : 27 : Plan *subplan = (Plan *) lfirst(lc);
206 : :
207 [ + - + + ]: 27 : if (subplan && !subplan->parallel_safe)
208 : 6 : subplan = NULL;
209 : 27 : pstmt->subplans = lappend(pstmt->subplans, subplan);
210 : : }
211 : :
3732 rhaas@postgresql.org 212 : 378 : pstmt->rewindPlanIDs = NULL;
213 : 378 : pstmt->rowMarks = NIL;
214 : 378 : pstmt->relationOids = NIL;
215 : 378 : pstmt->invalItems = NIL; /* workers can't replan anyway... */
2955 216 : 378 : pstmt->paramExecTypes = estate->es_plannedstmt->paramExecTypes;
3258 tgl@sss.pgh.pa.us 217 : 378 : pstmt->utilityStmt = NULL;
218 : 378 : pstmt->stmt_location = -1;
219 : 378 : pstmt->stmt_len = -1;
220 : :
221 : : /* Return serialized copy of our dummy PlannedStmt. */
3732 rhaas@postgresql.org 222 : 378 : return nodeToString(pstmt);
223 : : }
224 : :
225 : : /*
226 : : * Parallel-aware plan nodes (and occasionally others) may need some state
227 : : * which is shared across all parallel workers. Before we size the DSM, give
228 : : * them a chance to call shm_toc_estimate_chunk or shm_toc_estimate_keys on
229 : : * &pcxt->estimator.
230 : : *
231 : : * While we're at it, count the number of PlanState nodes in the tree, so
232 : : * we know how many Instrumentation structures we need.
233 : : */
234 : : static bool
235 : 1527 : ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e)
236 : : {
237 [ - + ]: 1527 : if (planstate == NULL)
3732 rhaas@postgresql.org 238 :UBC 0 : return false;
239 : :
240 : : /* Count this node. */
3732 rhaas@postgresql.org 241 :CBC 1527 : e->nnodes++;
242 : :
3031 243 [ + + + + : 1527 : switch (nodeTag(planstate))
- + + - +
+ + + - +
+ + ]
244 : : {
245 : 582 : case T_SeqScanState:
246 [ + + ]: 582 : if (planstate->plan->parallel_aware)
3618 247 : 463 : ExecSeqScanEstimate((SeqScanState *) planstate,
248 : : e->pcxt);
3031 249 : 582 : break;
250 : 147 : case T_IndexScanState:
251 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
280 pg@bowt.ie 252 : 147 : ExecIndexScanEstimate((IndexScanState *) planstate,
253 : : e->pcxt);
3031 rhaas@postgresql.org 254 : 147 : break;
255 : 29 : case T_IndexOnlyScanState:
256 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
280 pg@bowt.ie 257 : 29 : ExecIndexOnlyScanEstimate((IndexOnlyScanState *) planstate,
258 : : e->pcxt);
259 : 29 : break;
260 : 10 : case T_BitmapIndexScanState:
261 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
262 : 10 : ExecBitmapIndexScanEstimate((BitmapIndexScanState *) planstate,
263 : : e->pcxt);
3031 rhaas@postgresql.org 264 : 10 : break;
3031 rhaas@postgresql.org 265 :UBC 0 : case T_ForeignScanState:
266 [ # # ]: 0 : if (planstate->plan->parallel_aware)
3604 267 : 0 : ExecForeignScanEstimate((ForeignScanState *) planstate,
268 : : e->pcxt);
3031 269 : 0 : break;
19 drowley@postgresql.o 270 :GNC 12 : case T_TidRangeScanState:
271 [ + - ]: 12 : if (planstate->plan->parallel_aware)
272 : 12 : ExecTidRangeScanEstimate((TidRangeScanState *) planstate,
273 : : e->pcxt);
274 : 12 : break;
2933 rhaas@postgresql.org 275 :CBC 93 : case T_AppendState:
276 [ + + ]: 93 : if (planstate->plan->parallel_aware)
277 : 69 : ExecAppendEstimate((AppendState *) planstate,
278 : : e->pcxt);
279 : 93 : break;
3031 rhaas@postgresql.org 280 :UBC 0 : case T_CustomScanState:
281 [ # # ]: 0 : if (planstate->plan->parallel_aware)
3604 282 : 0 : ExecCustomScanEstimate((CustomScanState *) planstate,
283 : : e->pcxt);
3031 284 : 0 : break;
3031 rhaas@postgresql.org 285 :CBC 10 : case T_BitmapHeapScanState:
286 [ + + ]: 10 : if (planstate->plan->parallel_aware)
3205 287 : 9 : ExecBitmapHeapEstimate((BitmapHeapScanState *) planstate,
288 : : e->pcxt);
3031 289 : 10 : break;
2918 andres@anarazel.de 290 : 99 : case T_HashJoinState:
291 [ + + ]: 99 : if (planstate->plan->parallel_aware)
292 : 63 : ExecHashJoinEstimate((HashJoinState *) planstate,
293 : : e->pcxt);
294 : 99 : break;
2933 295 : 99 : case T_HashState:
296 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
297 : 99 : ExecHashEstimate((HashState *) planstate, e->pcxt);
298 : 99 : break;
3031 rhaas@postgresql.org 299 : 79 : case T_SortState:
300 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
301 : 79 : ExecSortEstimate((SortState *) planstate, e->pcxt);
3030 tgl@sss.pgh.pa.us 302 : 79 : break;
2080 tomas.vondra@postgre 303 :UBC 0 : case T_IncrementalSortState:
304 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
305 : 0 : ExecIncrementalSortEstimate((IncrementalSortState *) planstate, e->pcxt);
306 : 0 : break;
2006 drowley@postgresql.o 307 :CBC 293 : case T_AggState:
308 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
309 : 293 : ExecAggEstimate((AggState *) planstate, e->pcxt);
310 : 293 : break;
1616 311 : 3 : case T_MemoizeState:
312 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
313 : 3 : ExecMemoizeEstimate((MemoizeState *) planstate, e->pcxt);
1719 314 : 3 : break;
3031 rhaas@postgresql.org 315 : 71 : default:
316 : 71 : break;
317 : : }
318 : :
3732 319 : 1527 : return planstate_tree_walker(planstate, ExecParallelEstimate, e);
320 : : }
321 : :
322 : : /*
323 : : * Estimate the amount of space required to serialize the indicated parameters.
324 : : */
325 : : static Size
2952 326 : 12 : EstimateParamExecSpace(EState *estate, Bitmapset *params)
327 : : {
328 : : int paramid;
329 : 12 : Size sz = sizeof(int);
330 : :
331 : 12 : paramid = -1;
332 [ + + ]: 27 : while ((paramid = bms_next_member(params, paramid)) >= 0)
333 : : {
334 : : Oid typeOid;
335 : : int16 typLen;
336 : : bool typByVal;
337 : : ParamExecData *prm;
338 : :
339 : 15 : prm = &(estate->es_param_exec_vals[paramid]);
340 : 15 : typeOid = list_nth_oid(estate->es_plannedstmt->paramExecTypes,
341 : : paramid);
342 : :
343 : 15 : sz = add_size(sz, sizeof(int)); /* space for paramid */
344 : :
345 : : /* space for datum/isnull */
346 [ + - ]: 15 : if (OidIsValid(typeOid))
347 : 15 : get_typlenbyval(typeOid, &typLen, &typByVal);
348 : : else
349 : : {
350 : : /* If no type OID, assume by-value, like copyParamList does. */
2952 rhaas@postgresql.org 351 :UBC 0 : typLen = sizeof(Datum);
352 : 0 : typByVal = true;
353 : : }
2952 rhaas@postgresql.org 354 :CBC 15 : sz = add_size(sz,
355 : 15 : datumEstimateSpace(prm->value, prm->isnull,
356 : : typByVal, typLen));
357 : : }
358 : 12 : return sz;
359 : : }
360 : :
361 : : /*
362 : : * Serialize specified PARAM_EXEC parameters.
363 : : *
364 : : * We write the number of parameters first, as a 4-byte integer, and then
365 : : * write details for each parameter in turn. The details for each parameter
366 : : * consist of a 4-byte paramid (location of param in execution time internal
367 : : * parameter array) and then the datum as serialized by datumSerialize().
368 : : */
369 : : static dsa_pointer
2920 370 : 12 : SerializeParamExecParams(EState *estate, Bitmapset *params, dsa_area *area)
371 : : {
372 : : Size size;
373 : : int nparams;
374 : : int paramid;
375 : : ParamExecData *prm;
376 : : dsa_pointer handle;
377 : : char *start_address;
378 : :
379 : : /* Allocate enough space for the current parameter values. */
2952 380 : 12 : size = EstimateParamExecSpace(estate, params);
2920 381 : 12 : handle = dsa_allocate(area, size);
382 : 12 : start_address = dsa_get_address(area, handle);
383 : :
384 : : /* First write the number of parameters as a 4-byte integer. */
2952 385 : 12 : nparams = bms_num_members(params);
386 : 12 : memcpy(start_address, &nparams, sizeof(int));
387 : 12 : start_address += sizeof(int);
388 : :
389 : : /* Write details for each parameter in turn. */
390 : 12 : paramid = -1;
391 [ + + ]: 27 : while ((paramid = bms_next_member(params, paramid)) >= 0)
392 : : {
393 : : Oid typeOid;
394 : : int16 typLen;
395 : : bool typByVal;
396 : :
397 : 15 : prm = &(estate->es_param_exec_vals[paramid]);
398 : 15 : typeOid = list_nth_oid(estate->es_plannedstmt->paramExecTypes,
399 : : paramid);
400 : :
401 : : /* Write paramid. */
402 : 15 : memcpy(start_address, ¶mid, sizeof(int));
403 : 15 : start_address += sizeof(int);
404 : :
405 : : /* Write datum/isnull */
406 [ + - ]: 15 : if (OidIsValid(typeOid))
407 : 15 : get_typlenbyval(typeOid, &typLen, &typByVal);
408 : : else
409 : : {
410 : : /* If no type OID, assume by-value, like copyParamList does. */
2952 rhaas@postgresql.org 411 :UBC 0 : typLen = sizeof(Datum);
412 : 0 : typByVal = true;
413 : : }
2952 rhaas@postgresql.org 414 :CBC 15 : datumSerialize(prm->value, prm->isnull, typByVal, typLen,
415 : : &start_address);
416 : : }
417 : :
418 : 12 : return handle;
419 : : }
420 : :
421 : : /*
422 : : * Restore specified PARAM_EXEC parameters.
423 : : */
424 : : static void
425 : 36 : RestoreParamExecParams(char *start_address, EState *estate)
426 : : {
427 : : int nparams;
428 : : int i;
429 : : int paramid;
430 : :
431 : 36 : memcpy(&nparams, start_address, sizeof(int));
432 : 36 : start_address += sizeof(int);
433 : :
434 [ + + ]: 78 : for (i = 0; i < nparams; i++)
435 : : {
436 : : ParamExecData *prm;
437 : :
438 : : /* Read paramid */
439 : 42 : memcpy(¶mid, start_address, sizeof(int));
440 : 42 : start_address += sizeof(int);
441 : 42 : prm = &(estate->es_param_exec_vals[paramid]);
442 : :
443 : : /* Read datum/isnull. */
444 : 42 : prm->value = datumRestore(&start_address, &prm->isnull);
445 : 42 : prm->execPlan = NULL;
446 : : }
447 : 36 : }
448 : :
449 : : /*
450 : : * Initialize the dynamic shared memory segment that will be used to control
451 : : * parallel execution.
452 : : */
453 : : static bool
3732 454 : 1527 : ExecParallelInitializeDSM(PlanState *planstate,
455 : : ExecParallelInitializeDSMContext *d)
456 : : {
457 [ - + ]: 1527 : if (planstate == NULL)
3732 rhaas@postgresql.org 458 :UBC 0 : return false;
459 : :
460 : : /* If instrumentation is enabled, initialize slot for this node. */
3732 rhaas@postgresql.org 461 [ + + ]:CBC 1527 : if (d->instrumentation != NULL)
3660 462 : 513 : d->instrumentation->plan_node_id[d->nnodes] =
463 : 513 : planstate->plan->plan_node_id;
464 : :
465 : : /* Count this node. */
3732 466 : 1527 : d->nnodes++;
467 : :
468 : : /*
469 : : * Call initializers for DSM-using plan nodes.
470 : : *
471 : : * Most plan nodes won't do anything here, but plan nodes that allocated
472 : : * DSM may need to initialize shared state in the DSM before parallel
473 : : * workers are launched. They can allocate the space they previously
474 : : * estimated using shm_toc_allocate, and add the keys they previously
475 : : * estimated using shm_toc_insert, in each case targeting pcxt->toc.
476 : : */
3031 477 [ + + + + : 1527 : switch (nodeTag(planstate))
- + + - +
+ + + - +
+ + ]
478 : : {
479 : 582 : case T_SeqScanState:
480 [ + + ]: 582 : if (planstate->plan->parallel_aware)
3618 481 : 463 : ExecSeqScanInitializeDSM((SeqScanState *) planstate,
482 : : d->pcxt);
3031 483 : 582 : break;
484 : 147 : case T_IndexScanState:
485 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
280 pg@bowt.ie 486 : 147 : ExecIndexScanInitializeDSM((IndexScanState *) planstate, d->pcxt);
3031 rhaas@postgresql.org 487 : 147 : break;
488 : 29 : case T_IndexOnlyScanState:
489 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
280 pg@bowt.ie 490 : 29 : ExecIndexOnlyScanInitializeDSM((IndexOnlyScanState *) planstate,
491 : : d->pcxt);
492 : 29 : break;
493 : 10 : case T_BitmapIndexScanState:
494 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
495 : 10 : ExecBitmapIndexScanInitializeDSM((BitmapIndexScanState *) planstate, d->pcxt);
3031 rhaas@postgresql.org 496 : 10 : break;
3031 rhaas@postgresql.org 497 :UBC 0 : case T_ForeignScanState:
498 [ # # ]: 0 : if (planstate->plan->parallel_aware)
3604 499 : 0 : ExecForeignScanInitializeDSM((ForeignScanState *) planstate,
500 : : d->pcxt);
3031 501 : 0 : break;
19 drowley@postgresql.o 502 :GNC 12 : case T_TidRangeScanState:
503 [ + - ]: 12 : if (planstate->plan->parallel_aware)
504 : 12 : ExecTidRangeScanInitializeDSM((TidRangeScanState *) planstate,
505 : : d->pcxt);
506 : 12 : break;
2933 rhaas@postgresql.org 507 :CBC 93 : case T_AppendState:
508 [ + + ]: 93 : if (planstate->plan->parallel_aware)
509 : 69 : ExecAppendInitializeDSM((AppendState *) planstate,
510 : : d->pcxt);
511 : 93 : break;
3031 rhaas@postgresql.org 512 :UBC 0 : case T_CustomScanState:
513 [ # # ]: 0 : if (planstate->plan->parallel_aware)
3604 514 : 0 : ExecCustomScanInitializeDSM((CustomScanState *) planstate,
515 : : d->pcxt);
3031 516 : 0 : break;
3031 rhaas@postgresql.org 517 :CBC 10 : case T_BitmapHeapScanState:
518 [ + + ]: 10 : if (planstate->plan->parallel_aware)
3205 519 : 9 : ExecBitmapHeapInitializeDSM((BitmapHeapScanState *) planstate,
520 : : d->pcxt);
3031 521 : 10 : break;
2918 andres@anarazel.de 522 : 99 : case T_HashJoinState:
523 [ + + ]: 99 : if (planstate->plan->parallel_aware)
524 : 63 : ExecHashJoinInitializeDSM((HashJoinState *) planstate,
525 : : d->pcxt);
526 : 99 : break;
2933 527 : 99 : case T_HashState:
528 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
529 : 99 : ExecHashInitializeDSM((HashState *) planstate, d->pcxt);
530 : 99 : break;
3031 rhaas@postgresql.org 531 : 79 : case T_SortState:
532 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
533 : 79 : ExecSortInitializeDSM((SortState *) planstate, d->pcxt);
3030 tgl@sss.pgh.pa.us 534 : 79 : break;
2080 tomas.vondra@postgre 535 :UBC 0 : case T_IncrementalSortState:
536 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
537 : 0 : ExecIncrementalSortInitializeDSM((IncrementalSortState *) planstate, d->pcxt);
538 : 0 : break;
2006 drowley@postgresql.o 539 :CBC 293 : case T_AggState:
540 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
541 : 293 : ExecAggInitializeDSM((AggState *) planstate, d->pcxt);
542 : 293 : break;
1616 543 : 3 : case T_MemoizeState:
544 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
545 : 3 : ExecMemoizeInitializeDSM((MemoizeState *) planstate, d->pcxt);
1719 546 : 3 : break;
3031 rhaas@postgresql.org 547 : 71 : default:
548 : 71 : break;
549 : : }
550 : :
3732 551 : 1527 : return planstate_tree_walker(planstate, ExecParallelInitializeDSM, d);
552 : : }
553 : :
554 : : /*
555 : : * It sets up the response queues for backend workers to return tuples
556 : : * to the main backend and start the workers.
557 : : */
558 : : static shm_mq_handle **
3700 559 : 507 : ExecParallelSetupTupleQueues(ParallelContext *pcxt, bool reinitialize)
560 : : {
561 : : shm_mq_handle **responseq;
562 : : char *tqueuespace;
563 : : int i;
564 : :
565 : : /* Skip this if no workers. */
3732 566 [ - + ]: 507 : if (pcxt->nworkers == 0)
3732 rhaas@postgresql.org 567 :UBC 0 : return NULL;
568 : :
569 : : /* Allocate memory for shared memory queue handles. */
570 : : responseq = (shm_mq_handle **)
3732 rhaas@postgresql.org 571 :CBC 507 : palloc(pcxt->nworkers * sizeof(shm_mq_handle *));
572 : :
573 : : /*
574 : : * If not reinitializing, allocate space from the DSM for the queues;
575 : : * otherwise, find the already allocated space.
576 : : */
3700 577 [ + + ]: 507 : if (!reinitialize)
578 : : tqueuespace =
579 : 378 : shm_toc_allocate(pcxt->toc,
580 : : mul_size(PARALLEL_TUPLE_QUEUE_SIZE,
3511 581 : 378 : pcxt->nworkers));
582 : : else
3116 tgl@sss.pgh.pa.us 583 : 129 : tqueuespace = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE, false);
584 : :
585 : : /* Create the queues, and become the receiver for each. */
3732 rhaas@postgresql.org 586 [ + + ]: 1880 : for (i = 0; i < pcxt->nworkers; ++i)
587 : : {
588 : : shm_mq *mq;
589 : :
3511 590 : 1373 : mq = shm_mq_create(tqueuespace +
591 : 1373 : ((Size) i) * PARALLEL_TUPLE_QUEUE_SIZE,
592 : : (Size) PARALLEL_TUPLE_QUEUE_SIZE);
593 : :
3732 594 : 1373 : shm_mq_set_receiver(mq, MyProc);
595 : 1373 : responseq[i] = shm_mq_attach(mq, pcxt->seg, NULL);
596 : : }
597 : :
598 : : /* Add array of queues to shm_toc, so others can find it. */
3700 599 [ + + ]: 507 : if (!reinitialize)
600 : 378 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE, tqueuespace);
601 : :
602 : : /* Return array of handles. */
3732 603 : 507 : return responseq;
604 : : }
605 : :
606 : : /*
607 : : * Sets up the required infrastructure for backend workers to perform
608 : : * execution and return results to the main backend.
609 : : */
610 : : ParallelExecutorInfo *
2952 611 : 378 : ExecInitParallelPlan(PlanState *planstate, EState *estate,
612 : : Bitmapset *sendParams, int nworkers,
613 : : int64 tuples_needed)
614 : : {
615 : : ParallelExecutorInfo *pei;
616 : : ParallelContext *pcxt;
617 : : ExecParallelEstimateContext e;
618 : : ExecParallelInitializeDSMContext d;
619 : : FixedParallelExecutorState *fpes;
620 : : char *pstmt_data;
621 : : char *pstmt_space;
622 : : char *paramlistinfo_space;
623 : : BufferUsage *bufusage_space;
624 : : WalUsage *walusage_space;
3732 625 : 378 : SharedExecutorInstrumentation *instrumentation = NULL;
2639 andres@anarazel.de 626 : 378 : SharedJitInstrumentation *jit_instrumentation = NULL;
627 : : int pstmt_len;
628 : : int paramlistinfo_len;
3732 rhaas@postgresql.org 629 : 378 : int instrumentation_len = 0;
2639 andres@anarazel.de 630 : 378 : int jit_instrumentation_len = 0;
3660 rhaas@postgresql.org 631 : 378 : int instrument_offset = 0;
3284 632 : 378 : Size dsa_minsize = dsa_minimum_size();
633 : : char *query_string;
634 : : int query_len;
635 : :
636 : : /*
637 : : * Force any initplan outputs that we're going to pass to workers to be
638 : : * evaluated, if they weren't already.
639 : : *
640 : : * For simplicity, we use the EState's per-output-tuple ExprContext here.
641 : : * That risks intra-query memory leakage, since we might pass through here
642 : : * many times before that ExprContext gets reset; but ExecSetParamPlan
643 : : * doesn't normally leak any memory in the context (see its comments), so
644 : : * it doesn't seem worth complicating this function's API to pass it a
645 : : * shorter-lived ExprContext. This might need to change someday.
646 : : */
2649 tgl@sss.pgh.pa.us 647 [ + + ]: 378 : ExecSetParamPlanMulti(sendParams, GetPerTupleExprContext(estate));
648 : :
649 : : /* Allocate object for return value. */
6 michael@paquier.xyz 650 :GNC 378 : pei = palloc0_object(ParallelExecutorInfo);
3681 rhaas@postgresql.org 651 :CBC 378 : pei->finished = false;
3732 652 : 378 : pei->planstate = planstate;
653 : :
654 : : /* Fix up and serialize plan to be sent to workers. */
3730 655 : 378 : pstmt_data = ExecSerializePlan(planstate->plan, estate);
656 : :
657 : : /* Create a parallel context. */
2468 tmunro@postgresql.or 658 : 378 : pcxt = CreateParallelContext("postgres", "ParallelQueryMain", nworkers);
3732 rhaas@postgresql.org 659 : 378 : pei->pcxt = pcxt;
660 : :
661 : : /*
662 : : * Before telling the parallel context to create a dynamic shared memory
663 : : * segment, we need to figure out how big it should be. Estimate space
664 : : * for the various things we need to store.
665 : : */
666 : :
667 : : /* Estimate space for fixed-size state. */
3031 668 : 378 : shm_toc_estimate_chunk(&pcxt->estimator,
669 : : sizeof(FixedParallelExecutorState));
670 : 378 : shm_toc_estimate_keys(&pcxt->estimator, 1);
671 : :
672 : : /* Estimate space for query text. */
1706 tgl@sss.pgh.pa.us 673 : 378 : query_len = strlen(estate->es_sourceText);
2918 rhaas@postgresql.org 674 : 378 : shm_toc_estimate_chunk(&pcxt->estimator, query_len + 1);
3219 675 : 378 : shm_toc_estimate_keys(&pcxt->estimator, 1);
676 : :
677 : : /* Estimate space for serialized PlannedStmt. */
3732 678 : 378 : pstmt_len = strlen(pstmt_data) + 1;
679 : 378 : shm_toc_estimate_chunk(&pcxt->estimator, pstmt_len);
680 : 378 : shm_toc_estimate_keys(&pcxt->estimator, 1);
681 : :
682 : : /* Estimate space for serialized ParamListInfo. */
2952 683 : 378 : paramlistinfo_len = EstimateParamListSpace(estate->es_param_list_info);
684 : 378 : shm_toc_estimate_chunk(&pcxt->estimator, paramlistinfo_len);
3732 685 : 378 : shm_toc_estimate_keys(&pcxt->estimator, 1);
686 : :
687 : : /*
688 : : * Estimate space for BufferUsage.
689 : : *
690 : : * If EXPLAIN is not in use and there are no extensions loaded that care,
691 : : * we could skip this. But we have no way of knowing whether anyone's
692 : : * looking at pgBufferUsage, so do it unconditionally.
693 : : */
694 : 378 : shm_toc_estimate_chunk(&pcxt->estimator,
695 : : mul_size(sizeof(BufferUsage), pcxt->nworkers));
696 : 378 : shm_toc_estimate_keys(&pcxt->estimator, 1);
697 : :
698 : : /*
699 : : * Same thing for WalUsage.
700 : : */
2082 akapila@postgresql.o 701 : 378 : shm_toc_estimate_chunk(&pcxt->estimator,
702 : : mul_size(sizeof(WalUsage), pcxt->nworkers));
703 : 378 : shm_toc_estimate_keys(&pcxt->estimator, 1);
704 : :
705 : : /* Estimate space for tuple queues. */
3732 rhaas@postgresql.org 706 : 378 : shm_toc_estimate_chunk(&pcxt->estimator,
707 : : mul_size(PARALLEL_TUPLE_QUEUE_SIZE, pcxt->nworkers));
708 : 378 : shm_toc_estimate_keys(&pcxt->estimator, 1);
709 : :
710 : : /*
711 : : * Give parallel-aware nodes a chance to add to the estimates, and get a
712 : : * count of how many PlanState nodes there are.
713 : : */
714 : 378 : e.pcxt = pcxt;
715 : 378 : e.nnodes = 0;
716 : 378 : ExecParallelEstimate(planstate, &e);
717 : :
718 : : /* Estimate space for instrumentation, if required. */
719 [ + + ]: 378 : if (estate->es_instrument)
720 : : {
721 : 90 : instrumentation_len =
722 : : offsetof(SharedExecutorInstrumentation, plan_node_id) +
3514 723 : 90 : sizeof(int) * e.nnodes;
3660 724 : 90 : instrumentation_len = MAXALIGN(instrumentation_len);
725 : 90 : instrument_offset = instrumentation_len;
3511 726 : 90 : instrumentation_len +=
727 : 90 : mul_size(sizeof(Instrumentation),
728 : 90 : mul_size(e.nnodes, nworkers));
3732 729 : 90 : shm_toc_estimate_chunk(&pcxt->estimator, instrumentation_len);
730 : 90 : shm_toc_estimate_keys(&pcxt->estimator, 1);
731 : :
732 : : /* Estimate space for JIT instrumentation, if required. */
2639 andres@anarazel.de 733 [ + + ]: 90 : if (estate->es_jit_flags != PGJIT_NONE)
734 : : {
735 : 12 : jit_instrumentation_len =
736 : 12 : offsetof(SharedJitInstrumentation, jit_instr) +
737 : : sizeof(JitInstrumentation) * nworkers;
738 : 12 : shm_toc_estimate_chunk(&pcxt->estimator, jit_instrumentation_len);
739 : 12 : shm_toc_estimate_keys(&pcxt->estimator, 1);
740 : : }
741 : : }
742 : :
743 : : /* Estimate space for DSA area. */
3284 rhaas@postgresql.org 744 : 378 : shm_toc_estimate_chunk(&pcxt->estimator, dsa_minsize);
745 : 378 : shm_toc_estimate_keys(&pcxt->estimator, 1);
746 : :
747 : : /*
748 : : * InitializeParallelDSM() passes the active snapshot to the parallel
749 : : * worker, which uses it to set es_snapshot. Make sure we don't set
750 : : * es_snapshot differently in the child.
751 : : */
642 heikki.linnakangas@i 752 [ - + ]: 378 : Assert(GetActiveSnapshot() == estate->es_snapshot);
753 : :
754 : : /* Everyone's had a chance to ask for space, so now create the DSM. */
3732 rhaas@postgresql.org 755 : 378 : InitializeParallelDSM(pcxt);
756 : :
757 : : /*
758 : : * OK, now we have a dynamic shared memory segment, and it should be big
759 : : * enough to store all of the data we estimated we would want to put into
760 : : * it, plus whatever general stuff (not specifically executor-related) the
761 : : * ParallelContext itself needs to store there. None of the space we
762 : : * asked for has been allocated or initialized yet, though, so do that.
763 : : */
764 : :
765 : : /* Store fixed-size state. */
3031 766 : 378 : fpes = shm_toc_allocate(pcxt->toc, sizeof(FixedParallelExecutorState));
767 : 378 : fpes->tuples_needed = tuples_needed;
2952 768 : 378 : fpes->param_exec = InvalidDsaPointer;
2948 769 : 378 : fpes->eflags = estate->es_top_eflags;
2826 andres@anarazel.de 770 : 378 : fpes->jit_flags = estate->es_jit_flags;
3031 rhaas@postgresql.org 771 : 378 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_EXECUTOR_FIXED, fpes);
772 : :
773 : : /* Store query string */
2918 774 : 378 : query_string = shm_toc_allocate(pcxt->toc, query_len + 1);
1706 tgl@sss.pgh.pa.us 775 : 378 : memcpy(query_string, estate->es_sourceText, query_len + 1);
3219 rhaas@postgresql.org 776 : 378 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_QUERY_TEXT, query_string);
777 : :
778 : : /* Store serialized PlannedStmt. */
3732 779 : 378 : pstmt_space = shm_toc_allocate(pcxt->toc, pstmt_len);
780 : 378 : memcpy(pstmt_space, pstmt_data, pstmt_len);
781 : 378 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_PLANNEDSTMT, pstmt_space);
782 : :
783 : : /* Store serialized ParamListInfo. */
2952 784 : 378 : paramlistinfo_space = shm_toc_allocate(pcxt->toc, paramlistinfo_len);
785 : 378 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_PARAMLISTINFO, paramlistinfo_space);
786 : 378 : SerializeParamList(estate->es_param_list_info, ¶mlistinfo_space);
787 : :
788 : : /* Allocate space for each worker's BufferUsage; no need to initialize. */
3732 789 : 378 : bufusage_space = shm_toc_allocate(pcxt->toc,
3100 tgl@sss.pgh.pa.us 790 : 378 : mul_size(sizeof(BufferUsage), pcxt->nworkers));
3732 rhaas@postgresql.org 791 : 378 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_BUFFER_USAGE, bufusage_space);
792 : 378 : pei->buffer_usage = bufusage_space;
793 : :
794 : : /* Same for WalUsage. */
2082 akapila@postgresql.o 795 : 378 : walusage_space = shm_toc_allocate(pcxt->toc,
796 : 378 : mul_size(sizeof(WalUsage), pcxt->nworkers));
797 : 378 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_WAL_USAGE, walusage_space);
798 : 378 : pei->wal_usage = walusage_space;
799 : :
800 : : /* Set up the tuple queues that the workers will write into. */
3700 rhaas@postgresql.org 801 : 378 : pei->tqueue = ExecParallelSetupTupleQueues(pcxt, false);
802 : :
803 : : /* We don't need the TupleQueueReaders yet, though. */
3028 tgl@sss.pgh.pa.us 804 : 378 : pei->reader = NULL;
805 : :
806 : : /*
807 : : * If instrumentation options were supplied, allocate space for the data.
808 : : * It only gets partially initialized here; the rest happens during
809 : : * ExecParallelInitializeDSM.
810 : : */
3732 rhaas@postgresql.org 811 [ + + ]: 378 : if (estate->es_instrument)
812 : : {
813 : : Instrumentation *instrument;
814 : : int i;
815 : :
816 : 90 : instrumentation = shm_toc_allocate(pcxt->toc, instrumentation_len);
817 : 90 : instrumentation->instrument_options = estate->es_instrument;
3660 818 : 90 : instrumentation->instrument_offset = instrument_offset;
819 : 90 : instrumentation->num_workers = nworkers;
820 : 90 : instrumentation->num_plan_nodes = e.nnodes;
821 : 90 : instrument = GetInstrumentationArray(instrumentation);
822 [ + + ]: 930 : for (i = 0; i < nworkers * e.nnodes; ++i)
823 : 840 : InstrInit(&instrument[i], estate->es_instrument);
3732 824 : 90 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_INSTRUMENTATION,
825 : : instrumentation);
826 : 90 : pei->instrumentation = instrumentation;
827 : :
2639 andres@anarazel.de 828 [ + + ]: 90 : if (estate->es_jit_flags != PGJIT_NONE)
829 : : {
830 : 12 : jit_instrumentation = shm_toc_allocate(pcxt->toc,
831 : : jit_instrumentation_len);
832 : 12 : jit_instrumentation->num_workers = nworkers;
833 : 12 : memset(jit_instrumentation->jit_instr, 0,
834 : : sizeof(JitInstrumentation) * nworkers);
835 : 12 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_JIT_INSTRUMENTATION,
836 : : jit_instrumentation);
837 : 12 : pei->jit_instrumentation = jit_instrumentation;
838 : : }
839 : : }
840 : :
841 : : /*
842 : : * Create a DSA area that can be used by the leader and all workers.
843 : : * (However, if we failed to create a DSM and are using private memory
844 : : * instead, then skip this.)
845 : : */
3284 rhaas@postgresql.org 846 [ + - ]: 378 : if (pcxt->seg != NULL)
847 : : {
848 : : char *area_space;
849 : :
850 : 378 : area_space = shm_toc_allocate(pcxt->toc, dsa_minsize);
851 : 378 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_DSA, area_space);
852 : 378 : pei->area = dsa_create_in_place(area_space, dsa_minsize,
853 : : LWTRANCHE_PARALLEL_QUERY_DSA,
854 : : pcxt->seg);
855 : :
856 : : /*
857 : : * Serialize parameters, if any, using DSA storage. We don't dare use
858 : : * the main parallel query DSM for this because we might relaunch
859 : : * workers after the values have changed (and thus the amount of
860 : : * storage required has changed).
861 : : */
2952 862 [ + + ]: 378 : if (!bms_is_empty(sendParams))
863 : : {
2920 864 : 12 : pei->param_exec = SerializeParamExecParams(estate, sendParams,
865 : : pei->area);
2952 866 : 12 : fpes->param_exec = pei->param_exec;
867 : : }
868 : : }
869 : :
870 : : /*
871 : : * Give parallel-aware nodes a chance to initialize their shared data.
872 : : * This also initializes the elements of instrumentation->ps_instrument,
873 : : * if it exists.
874 : : */
3732 875 : 378 : d.pcxt = pcxt;
876 : 378 : d.instrumentation = instrumentation;
877 : 378 : d.nnodes = 0;
878 : :
879 : : /* Install our DSA area while initializing the plan. */
2920 880 : 378 : estate->es_query_dsa = pei->area;
3732 881 : 378 : ExecParallelInitializeDSM(planstate, &d);
2920 882 : 378 : estate->es_query_dsa = NULL;
883 : :
884 : : /*
885 : : * Make sure that the world hasn't shifted under our feet. This could
886 : : * probably just be an Assert(), but let's be conservative for now.
887 : : */
3732 888 [ - + ]: 378 : if (e.nnodes != d.nnodes)
3732 rhaas@postgresql.org 889 [ # # ]:UBC 0 : elog(ERROR, "inconsistent count of PlanState nodes");
890 : :
891 : : /* OK, we're ready to rock and roll. */
3732 rhaas@postgresql.org 892 :CBC 378 : return pei;
893 : : }
894 : :
895 : : /*
896 : : * Set up tuple queue readers to read the results of a parallel subplan.
897 : : *
898 : : * This is separate from ExecInitParallelPlan() because we can launch the
899 : : * worker processes and let them start doing something before we do this.
900 : : */
901 : : void
3015 andres@anarazel.de 902 : 498 : ExecParallelCreateReaders(ParallelExecutorInfo *pei)
903 : : {
3028 tgl@sss.pgh.pa.us 904 : 498 : int nworkers = pei->pcxt->nworkers_launched;
905 : : int i;
906 : :
907 [ - + ]: 498 : Assert(pei->reader == NULL);
908 : :
909 [ + - ]: 498 : if (nworkers > 0)
910 : : {
911 : 498 : pei->reader = (TupleQueueReader **)
912 : 498 : palloc(nworkers * sizeof(TupleQueueReader *));
913 : :
914 [ + + ]: 1830 : for (i = 0; i < nworkers; i++)
915 : : {
916 : 1332 : shm_mq_set_handle(pei->tqueue[i],
917 : 1332 : pei->pcxt->worker[i].bgwhandle);
3015 andres@anarazel.de 918 : 1332 : pei->reader[i] = CreateTupleQueueReader(pei->tqueue[i]);
919 : : }
920 : : }
3028 tgl@sss.pgh.pa.us 921 : 498 : }
922 : :
923 : : /*
924 : : * Re-initialize the parallel executor shared memory state before launching
925 : : * a fresh batch of workers.
926 : : */
927 : : void
3030 928 : 129 : ExecParallelReinitialize(PlanState *planstate,
929 : : ParallelExecutorInfo *pei,
930 : : Bitmapset *sendParams)
931 : : {
2952 rhaas@postgresql.org 932 : 129 : EState *estate = planstate->state;
933 : : FixedParallelExecutorState *fpes;
934 : :
935 : : /* Old workers must already be shut down */
3030 tgl@sss.pgh.pa.us 936 [ - + ]: 129 : Assert(pei->finished);
937 : :
938 : : /*
939 : : * Force any initplan outputs that we're going to pass to workers to be
940 : : * evaluated, if they weren't already (see comments in
941 : : * ExecInitParallelPlan).
942 : : */
2649 943 [ + - ]: 129 : ExecSetParamPlanMulti(sendParams, GetPerTupleExprContext(estate));
944 : :
3030 945 : 129 : ReinitializeParallelDSM(pei->pcxt);
946 : 129 : pei->tqueue = ExecParallelSetupTupleQueues(pei->pcxt, true);
3028 947 : 129 : pei->reader = NULL;
3030 948 : 129 : pei->finished = false;
949 : :
2952 rhaas@postgresql.org 950 : 129 : fpes = shm_toc_lookup(pei->pcxt->toc, PARALLEL_KEY_EXECUTOR_FIXED, false);
951 : :
952 : : /* Free any serialized parameters from the last round. */
953 [ - + ]: 129 : if (DsaPointerIsValid(fpes->param_exec))
954 : : {
2920 rhaas@postgresql.org 955 :UBC 0 : dsa_free(pei->area, fpes->param_exec);
2952 956 : 0 : fpes->param_exec = InvalidDsaPointer;
957 : : }
958 : :
959 : : /* Serialize current parameter values if required. */
2952 rhaas@postgresql.org 960 [ - + ]:CBC 129 : if (!bms_is_empty(sendParams))
961 : : {
2920 rhaas@postgresql.org 962 :UBC 0 : pei->param_exec = SerializeParamExecParams(estate, sendParams,
963 : : pei->area);
2952 964 : 0 : fpes->param_exec = pei->param_exec;
965 : : }
966 : :
967 : : /* Traverse plan tree and let each child node reset associated state. */
2920 rhaas@postgresql.org 968 :CBC 129 : estate->es_query_dsa = pei->area;
3030 tgl@sss.pgh.pa.us 969 : 129 : ExecParallelReInitializeDSM(planstate, pei->pcxt);
2920 rhaas@postgresql.org 970 : 129 : estate->es_query_dsa = NULL;
3030 tgl@sss.pgh.pa.us 971 : 129 : }
972 : :
973 : : /*
974 : : * Traverse plan tree to reinitialize per-node dynamic shared memory state
975 : : */
976 : : static bool
977 : 333 : ExecParallelReInitializeDSM(PlanState *planstate,
978 : : ParallelContext *pcxt)
979 : : {
980 [ - + ]: 333 : if (planstate == NULL)
3030 tgl@sss.pgh.pa.us 981 :UBC 0 : return false;
982 : :
983 : : /*
984 : : * Call reinitializers for DSM-using plan nodes.
985 : : */
3030 tgl@sss.pgh.pa.us 986 [ + + + - :CBC 333 : switch (nodeTag(planstate))
- - - + +
+ + ]
987 : : {
988 : 138 : case T_SeqScanState:
989 [ + + ]: 138 : if (planstate->plan->parallel_aware)
990 : 114 : ExecSeqScanReInitializeDSM((SeqScanState *) planstate,
991 : : pcxt);
992 : 138 : break;
993 : 6 : case T_IndexScanState:
994 [ + - ]: 6 : if (planstate->plan->parallel_aware)
995 : 6 : ExecIndexScanReInitializeDSM((IndexScanState *) planstate,
996 : : pcxt);
997 : 6 : break;
998 : 6 : case T_IndexOnlyScanState:
999 [ + - ]: 6 : if (planstate->plan->parallel_aware)
1000 : 6 : ExecIndexOnlyScanReInitializeDSM((IndexOnlyScanState *) planstate,
1001 : : pcxt);
1002 : 6 : break;
3030 tgl@sss.pgh.pa.us 1003 :UBC 0 : case T_ForeignScanState:
1004 [ # # ]: 0 : if (planstate->plan->parallel_aware)
1005 : 0 : ExecForeignScanReInitializeDSM((ForeignScanState *) planstate,
1006 : : pcxt);
1007 : 0 : break;
19 drowley@postgresql.o 1008 :UNC 0 : case T_TidRangeScanState:
1009 [ # # ]: 0 : if (planstate->plan->parallel_aware)
1010 : 0 : ExecTidRangeScanReInitializeDSM((TidRangeScanState *) planstate,
1011 : : pcxt);
1012 : 0 : break;
2933 rhaas@postgresql.org 1013 :UBC 0 : case T_AppendState:
1014 [ # # ]: 0 : if (planstate->plan->parallel_aware)
1015 : 0 : ExecAppendReInitializeDSM((AppendState *) planstate, pcxt);
1016 : 0 : break;
3030 tgl@sss.pgh.pa.us 1017 : 0 : case T_CustomScanState:
1018 [ # # ]: 0 : if (planstate->plan->parallel_aware)
1019 : 0 : ExecCustomScanReInitializeDSM((CustomScanState *) planstate,
1020 : : pcxt);
1021 : 0 : break;
3030 tgl@sss.pgh.pa.us 1022 :CBC 27 : case T_BitmapHeapScanState:
1023 [ + - ]: 27 : if (planstate->plan->parallel_aware)
1024 : 27 : ExecBitmapHeapReInitializeDSM((BitmapHeapScanState *) planstate,
1025 : : pcxt);
1026 : 27 : break;
2918 andres@anarazel.de 1027 : 48 : case T_HashJoinState:
1028 [ + + ]: 48 : if (planstate->plan->parallel_aware)
1029 : 24 : ExecHashJoinReInitializeDSM((HashJoinState *) planstate,
1030 : : pcxt);
1031 : 48 : break;
280 pg@bowt.ie 1032 : 90 : case T_BitmapIndexScanState:
1033 : : case T_HashState:
1034 : : case T_SortState:
1035 : : case T_IncrementalSortState:
1036 : : case T_MemoizeState:
1037 : : /* these nodes have DSM state, but no reinitialization is required */
3030 tgl@sss.pgh.pa.us 1038 : 90 : break;
1039 : :
1040 : 18 : default:
1041 : 18 : break;
1042 : : }
1043 : :
1044 : 333 : return planstate_tree_walker(planstate, ExecParallelReInitializeDSM, pcxt);
1045 : : }
1046 : :
1047 : : /*
1048 : : * Copy instrumentation information about this node and its descendants from
1049 : : * dynamic shared memory.
1050 : : */
1051 : : static bool
3732 rhaas@postgresql.org 1052 : 513 : ExecParallelRetrieveInstrumentation(PlanState *planstate,
1053 : : SharedExecutorInstrumentation *instrumentation)
1054 : : {
1055 : : Instrumentation *instrument;
1056 : : int i;
1057 : : int n;
1058 : : int ibytes;
3477 1059 : 513 : int plan_node_id = planstate->plan->plan_node_id;
1060 : : MemoryContext oldcontext;
1061 : :
1062 : : /* Find the instrumentation for this node. */
3660 1063 [ + - ]: 2319 : for (i = 0; i < instrumentation->num_plan_nodes; ++i)
1064 [ + + ]: 2319 : if (instrumentation->plan_node_id[i] == plan_node_id)
3732 1065 : 513 : break;
3660 1066 [ - + ]: 513 : if (i >= instrumentation->num_plan_nodes)
3732 rhaas@postgresql.org 1067 [ # # ]:UBC 0 : elog(ERROR, "plan node %d not found", plan_node_id);
1068 : :
1069 : : /* Accumulate the statistics from all workers. */
3660 rhaas@postgresql.org 1070 :CBC 513 : instrument = GetInstrumentationArray(instrumentation);
1071 : 513 : instrument += i * instrumentation->num_workers;
1072 [ + + ]: 1353 : for (n = 0; n < instrumentation->num_workers; ++n)
1073 : 840 : InstrAggNode(planstate->instrument, &instrument[n]);
1074 : :
1075 : : /*
1076 : : * Also store the per-worker detail.
1077 : : *
1078 : : * Worker instrumentation should be allocated in the same context as the
1079 : : * regular instrumentation information, which is the per-query context.
1080 : : * Switch into per-query memory context.
1081 : : */
2925 1082 : 513 : oldcontext = MemoryContextSwitchTo(planstate->state->es_query_cxt);
1083 : 513 : ibytes = mul_size(instrumentation->num_workers, sizeof(Instrumentation));
1084 : 513 : planstate->worker_instrument =
1085 : 513 : palloc(ibytes + offsetof(WorkerInstrumentation, instrument));
1086 : 513 : MemoryContextSwitchTo(oldcontext);
1087 : :
3660 1088 : 513 : planstate->worker_instrument->num_workers = instrumentation->num_workers;
2925 1089 : 513 : memcpy(&planstate->worker_instrument->instrument, instrument, ibytes);
1090 : :
1091 : : /* Perform any node-type-specific work that needs to be done. */
2933 andres@anarazel.de 1092 [ + - - + : 513 : switch (nodeTag(planstate))
- + + - -
+ ]
1093 : : {
280 pg@bowt.ie 1094 : 135 : case T_IndexScanState:
1095 : 135 : ExecIndexScanRetrieveInstrumentation((IndexScanState *) planstate);
1096 : 135 : break;
280 pg@bowt.ie 1097 :UBC 0 : case T_IndexOnlyScanState:
1098 : 0 : ExecIndexOnlyScanRetrieveInstrumentation((IndexOnlyScanState *) planstate);
1099 : 0 : break;
1100 : 0 : case T_BitmapIndexScanState:
1101 : 0 : ExecBitmapIndexScanRetrieveInstrumentation((BitmapIndexScanState *) planstate);
1102 : 0 : break;
2933 andres@anarazel.de 1103 :CBC 6 : case T_SortState:
1104 : 6 : ExecSortRetrieveInstrumentation((SortState *) planstate);
1105 : 6 : break;
2080 tomas.vondra@postgre 1106 :UBC 0 : case T_IncrementalSortState:
1107 : 0 : ExecIncrementalSortRetrieveInstrumentation((IncrementalSortState *) planstate);
1108 : 0 : break;
2933 andres@anarazel.de 1109 :CBC 42 : case T_HashState:
1110 : 42 : ExecHashRetrieveInstrumentation((HashState *) planstate);
1111 : 42 : break;
2006 drowley@postgresql.o 1112 : 51 : case T_AggState:
1113 : 51 : ExecAggRetrieveInstrumentation((AggState *) planstate);
1114 : 51 : break;
1616 drowley@postgresql.o 1115 :UBC 0 : case T_MemoizeState:
1116 : 0 : ExecMemoizeRetrieveInstrumentation((MemoizeState *) planstate);
1719 1117 : 0 : break;
525 1118 : 0 : case T_BitmapHeapScanState:
1119 : 0 : ExecBitmapHeapRetrieveInstrumentation((BitmapHeapScanState *) planstate);
1120 : 0 : break;
2933 andres@anarazel.de 1121 :CBC 279 : default:
1122 : 279 : break;
1123 : : }
1124 : :
3732 rhaas@postgresql.org 1125 : 513 : return planstate_tree_walker(planstate, ExecParallelRetrieveInstrumentation,
1126 : : instrumentation);
1127 : : }
1128 : :
1129 : : /*
1130 : : * Add up the workers' JIT instrumentation from dynamic shared memory.
1131 : : */
1132 : : static void
2639 andres@anarazel.de 1133 : 12 : ExecParallelRetrieveJitInstrumentation(PlanState *planstate,
1134 : : SharedJitInstrumentation *shared_jit)
1135 : : {
1136 : : JitInstrumentation *combined;
1137 : : int ibytes;
1138 : :
1139 : : int n;
1140 : :
1141 : : /*
1142 : : * Accumulate worker JIT instrumentation into the combined JIT
1143 : : * instrumentation, allocating it if required.
1144 : : */
2631 1145 [ + - ]: 12 : if (!planstate->state->es_jit_worker_instr)
1146 : 12 : planstate->state->es_jit_worker_instr =
2639 1147 : 12 : MemoryContextAllocZero(planstate->state->es_query_cxt, sizeof(JitInstrumentation));
2631 1148 : 12 : combined = planstate->state->es_jit_worker_instr;
1149 : :
1150 : : /* Accumulate all the workers' instrumentations. */
2639 1151 [ + + ]: 36 : for (n = 0; n < shared_jit->num_workers; ++n)
1152 : 24 : InstrJitAgg(combined, &shared_jit->jit_instr[n]);
1153 : :
1154 : : /*
1155 : : * Store the per-worker detail.
1156 : : *
1157 : : * Similar to ExecParallelRetrieveInstrumentation(), allocate the
1158 : : * instrumentation in per-query context.
1159 : : */
1160 : 12 : ibytes = offsetof(SharedJitInstrumentation, jit_instr)
2400 tgl@sss.pgh.pa.us 1161 : 12 : + mul_size(shared_jit->num_workers, sizeof(JitInstrumentation));
2639 andres@anarazel.de 1162 : 12 : planstate->worker_jit_instrument =
1163 : 12 : MemoryContextAlloc(planstate->state->es_query_cxt, ibytes);
1164 : :
1165 : 12 : memcpy(planstate->worker_jit_instrument, shared_jit, ibytes);
1166 : 12 : }
1167 : :
1168 : : /*
1169 : : * Finish parallel execution. We wait for parallel workers to finish, and
1170 : : * accumulate their buffer/WAL usage.
1171 : : */
1172 : : void
3732 rhaas@postgresql.org 1173 : 915 : ExecParallelFinish(ParallelExecutorInfo *pei)
1174 : : {
3028 tgl@sss.pgh.pa.us 1175 : 915 : int nworkers = pei->pcxt->nworkers_launched;
1176 : : int i;
1177 : :
1178 : : /* Make this be a no-op if called twice in a row. */
3681 rhaas@postgresql.org 1179 [ + + ]: 915 : if (pei->finished)
1180 : 414 : return;
1181 : :
1182 : : /*
1183 : : * Detach from tuple queues ASAP, so that any still-active workers will
1184 : : * notice that no further results are wanted.
1185 : : */
3028 tgl@sss.pgh.pa.us 1186 [ + - ]: 501 : if (pei->tqueue != NULL)
1187 : : {
1188 [ + + ]: 1827 : for (i = 0; i < nworkers; i++)
1189 : 1326 : shm_mq_detach(pei->tqueue[i]);
1190 : 501 : pfree(pei->tqueue);
1191 : 501 : pei->tqueue = NULL;
1192 : : }
1193 : :
1194 : : /*
1195 : : * While we're waiting for the workers to finish, let's get rid of the
1196 : : * tuple queue readers. (Any other local cleanup could be done here too.)
1197 : : */
1198 [ + + ]: 501 : if (pei->reader != NULL)
1199 : : {
1200 [ + + ]: 1818 : for (i = 0; i < nworkers; i++)
1201 : 1326 : DestroyTupleQueueReader(pei->reader[i]);
1202 : 492 : pfree(pei->reader);
1203 : 492 : pei->reader = NULL;
1204 : : }
1205 : :
1206 : : /* Now wait for the workers to finish. */
3732 rhaas@postgresql.org 1207 : 501 : WaitForParallelWorkersToFinish(pei->pcxt);
1208 : :
1209 : : /*
1210 : : * Next, accumulate buffer/WAL usage. (This must wait for the workers to
1211 : : * finish, or we might get incomplete data.)
1212 : : */
3028 tgl@sss.pgh.pa.us 1213 [ + + ]: 1827 : for (i = 0; i < nworkers; i++)
2082 akapila@postgresql.o 1214 : 1326 : InstrAccumParallelQuery(&pei->buffer_usage[i], &pei->wal_usage[i]);
1215 : :
3681 rhaas@postgresql.org 1216 : 501 : pei->finished = true;
1217 : : }
1218 : :
1219 : : /*
1220 : : * Accumulate instrumentation, and then clean up whatever ParallelExecutorInfo
1221 : : * resources still exist after ExecParallelFinish. We separate these
1222 : : * routines because someone might want to examine the contents of the DSM
1223 : : * after ExecParallelFinish and before calling this routine.
1224 : : */
1225 : : void
3714 1226 : 372 : ExecParallelCleanup(ParallelExecutorInfo *pei)
1227 : : {
1228 : : /* Accumulate instrumentation, if any. */
2919 1229 [ + + ]: 372 : if (pei->instrumentation)
1230 : 90 : ExecParallelRetrieveInstrumentation(pei->planstate,
1231 : : pei->instrumentation);
1232 : :
1233 : : /* Accumulate JIT instrumentation, if any. */
2639 andres@anarazel.de 1234 [ + + ]: 372 : if (pei->jit_instrumentation)
1235 : 12 : ExecParallelRetrieveJitInstrumentation(pei->planstate,
2400 tgl@sss.pgh.pa.us 1236 : 12 : pei->jit_instrumentation);
1237 : :
1238 : : /* Free any serialized parameters. */
2952 rhaas@postgresql.org 1239 [ + + ]: 372 : if (DsaPointerIsValid(pei->param_exec))
1240 : : {
1241 : 12 : dsa_free(pei->area, pei->param_exec);
1242 : 12 : pei->param_exec = InvalidDsaPointer;
1243 : : }
3284 1244 [ + - ]: 372 : if (pei->area != NULL)
1245 : : {
1246 : 372 : dsa_detach(pei->area);
1247 : 372 : pei->area = NULL;
1248 : : }
3714 1249 [ + - ]: 372 : if (pei->pcxt != NULL)
1250 : : {
1251 : 372 : DestroyParallelContext(pei->pcxt);
1252 : 372 : pei->pcxt = NULL;
1253 : : }
1254 : 372 : pfree(pei);
1255 : 372 : }
1256 : :
1257 : : /*
1258 : : * Create a DestReceiver to write tuples we produce to the shm_mq designated
1259 : : * for that purpose.
1260 : : */
1261 : : static DestReceiver *
3732 1262 : 1332 : ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc)
1263 : : {
1264 : : char *mqspace;
1265 : : shm_mq *mq;
1266 : :
3116 tgl@sss.pgh.pa.us 1267 : 1332 : mqspace = shm_toc_lookup(toc, PARALLEL_KEY_TUPLE_QUEUE, false);
3732 rhaas@postgresql.org 1268 : 1332 : mqspace += ParallelWorkerNumber * PARALLEL_TUPLE_QUEUE_SIZE;
1269 : 1332 : mq = (shm_mq *) mqspace;
1270 : 1332 : shm_mq_set_sender(mq, MyProc);
1271 : 1332 : return CreateTupleQueueDestReceiver(shm_mq_attach(mq, seg, NULL));
1272 : : }
1273 : :
1274 : : /*
1275 : : * Create a QueryDesc for the PlannedStmt we are to execute, and return it.
1276 : : */
1277 : : static QueryDesc *
1278 : 1332 : ExecParallelGetQueryDesc(shm_toc *toc, DestReceiver *receiver,
1279 : : int instrument_options)
1280 : : {
1281 : : char *pstmtspace;
1282 : : char *paramspace;
1283 : : PlannedStmt *pstmt;
1284 : : ParamListInfo paramLI;
1285 : : char *queryString;
1286 : :
1287 : : /* Get the query string from shared memory */
3116 tgl@sss.pgh.pa.us 1288 : 1332 : queryString = shm_toc_lookup(toc, PARALLEL_KEY_QUERY_TEXT, false);
1289 : :
1290 : : /* Reconstruct leader-supplied PlannedStmt. */
1291 : 1332 : pstmtspace = shm_toc_lookup(toc, PARALLEL_KEY_PLANNEDSTMT, false);
3732 rhaas@postgresql.org 1292 : 1332 : pstmt = (PlannedStmt *) stringToNode(pstmtspace);
1293 : :
1294 : : /* Reconstruct ParamListInfo. */
2952 1295 : 1332 : paramspace = shm_toc_lookup(toc, PARALLEL_KEY_PARAMLISTINFO, false);
3732 1296 : 1332 : paramLI = RestoreParamList(¶mspace);
1297 : :
1298 : : /* Create a QueryDesc for the query. */
1299 : 1332 : return CreateQueryDesc(pstmt,
1300 : : queryString,
1301 : : GetActiveSnapshot(), InvalidSnapshot,
1302 : : receiver, paramLI, NULL, instrument_options);
1303 : : }
1304 : :
1305 : : /*
1306 : : * Copy instrumentation information from this node and its descendants into
1307 : : * dynamic shared memory, so that the parallel leader can retrieve it.
1308 : : */
1309 : : static bool
1310 : 1188 : ExecParallelReportInstrumentation(PlanState *planstate,
1311 : : SharedExecutorInstrumentation *instrumentation)
1312 : : {
1313 : : int i;
3477 1314 : 1188 : int plan_node_id = planstate->plan->plan_node_id;
1315 : : Instrumentation *instrument;
1316 : :
3660 1317 : 1188 : InstrEndLoop(planstate->instrument);
1318 : :
1319 : : /*
1320 : : * If we shuffled the plan_node_id values in ps_instrument into sorted
1321 : : * order, we could use binary search here. This might matter someday if
1322 : : * we're pushing down sufficiently large plan trees. For now, do it the
1323 : : * slow, dumb way.
1324 : : */
1325 [ + - ]: 3906 : for (i = 0; i < instrumentation->num_plan_nodes; ++i)
1326 [ + + ]: 3906 : if (instrumentation->plan_node_id[i] == plan_node_id)
3732 1327 : 1188 : break;
3660 1328 [ - + ]: 1188 : if (i >= instrumentation->num_plan_nodes)
3732 rhaas@postgresql.org 1329 [ # # ]:UBC 0 : elog(ERROR, "plan node %d not found", plan_node_id);
1330 : :
1331 : : /*
1332 : : * Add our statistics to the per-node, per-worker totals. It's possible
1333 : : * that this could happen more than once if we relaunched workers.
1334 : : */
3660 rhaas@postgresql.org 1335 :CBC 1188 : instrument = GetInstrumentationArray(instrumentation);
1336 : 1188 : instrument += i * instrumentation->num_workers;
1337 [ - + ]: 1188 : Assert(IsParallelWorker());
1338 [ - + ]: 1188 : Assert(ParallelWorkerNumber < instrumentation->num_workers);
1339 : 1188 : InstrAggNode(&instrument[ParallelWorkerNumber], planstate->instrument);
1340 : :
3732 1341 : 1188 : return planstate_tree_walker(planstate, ExecParallelReportInstrumentation,
1342 : : instrumentation);
1343 : : }
1344 : :
1345 : : /*
1346 : : * Initialize the PlanState and its descendants with the information
1347 : : * retrieved from shared memory. This has to be done once the PlanState
1348 : : * is allocated and initialized by executor; that is, after ExecutorStart().
1349 : : */
1350 : : static bool
2952 andres@anarazel.de 1351 : 4226 : ExecParallelInitializeWorker(PlanState *planstate, ParallelWorkerContext *pwcxt)
1352 : : {
3688 rhaas@postgresql.org 1353 [ - + ]: 4226 : if (planstate == NULL)
3688 rhaas@postgresql.org 1354 :UBC 0 : return false;
1355 : :
3031 rhaas@postgresql.org 1356 [ + + + + :CBC 4226 : switch (nodeTag(planstate))
- + + - +
+ + + - +
+ + ]
1357 : : {
1358 : 1685 : case T_SeqScanState:
1359 [ + + ]: 1685 : if (planstate->plan->parallel_aware)
2952 andres@anarazel.de 1360 : 1371 : ExecSeqScanInitializeWorker((SeqScanState *) planstate, pwcxt);
3031 rhaas@postgresql.org 1361 : 1685 : break;
1362 : 198 : case T_IndexScanState:
1363 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
280 pg@bowt.ie 1364 : 198 : ExecIndexScanInitializeWorker((IndexScanState *) planstate, pwcxt);
3031 rhaas@postgresql.org 1365 : 198 : break;
1366 : 121 : case T_IndexOnlyScanState:
1367 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
280 pg@bowt.ie 1368 : 121 : ExecIndexOnlyScanInitializeWorker((IndexOnlyScanState *) planstate,
1369 : : pwcxt);
1370 : 121 : break;
1371 : 135 : case T_BitmapIndexScanState:
1372 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
1373 : 135 : ExecBitmapIndexScanInitializeWorker((BitmapIndexScanState *) planstate,
1374 : : pwcxt);
3031 rhaas@postgresql.org 1375 : 135 : break;
3031 rhaas@postgresql.org 1376 :UBC 0 : case T_ForeignScanState:
1377 [ # # ]: 0 : if (planstate->plan->parallel_aware)
3604 1378 : 0 : ExecForeignScanInitializeWorker((ForeignScanState *) planstate,
1379 : : pwcxt);
3031 1380 : 0 : break;
19 drowley@postgresql.o 1381 :GNC 48 : case T_TidRangeScanState:
1382 [ + - ]: 48 : if (planstate->plan->parallel_aware)
1383 : 48 : ExecTidRangeScanInitializeWorker((TidRangeScanState *) planstate,
1384 : : pwcxt);
1385 : 48 : break;
2933 rhaas@postgresql.org 1386 :CBC 189 : case T_AppendState:
1387 [ + + ]: 189 : if (planstate->plan->parallel_aware)
1388 : 159 : ExecAppendInitializeWorker((AppendState *) planstate, pwcxt);
1389 : 189 : break;
3031 rhaas@postgresql.org 1390 :UBC 0 : case T_CustomScanState:
1391 [ # # ]: 0 : if (planstate->plan->parallel_aware)
3604 1392 : 0 : ExecCustomScanInitializeWorker((CustomScanState *) planstate,
1393 : : pwcxt);
3031 1394 : 0 : break;
3031 rhaas@postgresql.org 1395 :CBC 135 : case T_BitmapHeapScanState:
1396 [ + + ]: 135 : if (planstate->plan->parallel_aware)
2952 andres@anarazel.de 1397 : 134 : ExecBitmapHeapInitializeWorker((BitmapHeapScanState *) planstate,
1398 : : pwcxt);
3031 rhaas@postgresql.org 1399 : 135 : break;
2918 andres@anarazel.de 1400 : 279 : case T_HashJoinState:
1401 [ + + ]: 279 : if (planstate->plan->parallel_aware)
1402 : 159 : ExecHashJoinInitializeWorker((HashJoinState *) planstate,
1403 : : pwcxt);
1404 : 279 : break;
2933 1405 : 279 : case T_HashState:
1406 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
1407 : 279 : ExecHashInitializeWorker((HashState *) planstate, pwcxt);
1408 : 279 : break;
3031 rhaas@postgresql.org 1409 : 232 : case T_SortState:
1410 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
2952 andres@anarazel.de 1411 : 232 : ExecSortInitializeWorker((SortState *) planstate, pwcxt);
3030 tgl@sss.pgh.pa.us 1412 : 232 : break;
2080 tomas.vondra@postgre 1413 :UBC 0 : case T_IncrementalSortState:
1414 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
1415 : 0 : ExecIncrementalSortInitializeWorker((IncrementalSortState *) planstate,
1416 : : pwcxt);
1417 : 0 : break;
2006 drowley@postgresql.o 1418 :CBC 830 : case T_AggState:
1419 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
1420 : 830 : ExecAggInitializeWorker((AggState *) planstate, pwcxt);
1421 : 830 : break;
1616 1422 : 6 : case T_MemoizeState:
1423 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
1424 : 6 : ExecMemoizeInitializeWorker((MemoizeState *) planstate, pwcxt);
1719 1425 : 6 : break;
3031 rhaas@postgresql.org 1426 : 89 : default:
1427 : 89 : break;
1428 : : }
1429 : :
2952 andres@anarazel.de 1430 : 4226 : return planstate_tree_walker(planstate, ExecParallelInitializeWorker,
1431 : : pwcxt);
1432 : : }
1433 : :
1434 : : /*
1435 : : * Main entrypoint for parallel query worker processes.
1436 : : *
1437 : : * We reach this function from ParallelWorkerMain, so the setup necessary to
1438 : : * create a sensible parallel environment has already been done;
1439 : : * ParallelWorkerMain worries about stuff like the transaction state, combo
1440 : : * CID mappings, and GUC values, so we don't need to deal with any of that
1441 : : * here.
1442 : : *
1443 : : * Our job is to deal with concerns specific to the executor. The parallel
1444 : : * group leader will have stored a serialized PlannedStmt, and it's our job
1445 : : * to execute that plan and write the resulting tuples to the appropriate
1446 : : * tuple queue. Various bits of supporting information that we need in order
1447 : : * to do this are also stored in the dsm_segment and can be accessed through
1448 : : * the shm_toc.
1449 : : */
1450 : : void
3732 rhaas@postgresql.org 1451 : 1332 : ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
1452 : : {
1453 : : FixedParallelExecutorState *fpes;
1454 : : BufferUsage *buffer_usage;
1455 : : WalUsage *wal_usage;
1456 : : DestReceiver *receiver;
1457 : : QueryDesc *queryDesc;
1458 : : SharedExecutorInstrumentation *instrumentation;
1459 : : SharedJitInstrumentation *jit_instrumentation;
1460 : 1332 : int instrument_options = 0;
1461 : : void *area_space;
1462 : : dsa_area *area;
1463 : : ParallelWorkerContext pwcxt;
1464 : :
1465 : : /* Get fixed-size state. */
3031 1466 : 1332 : fpes = shm_toc_lookup(toc, PARALLEL_KEY_EXECUTOR_FIXED, false);
1467 : :
1468 : : /* Set up DestReceiver, SharedExecutorInstrumentation, and QueryDesc. */
3732 1469 : 1332 : receiver = ExecParallelGetReceiver(seg, toc);
3116 tgl@sss.pgh.pa.us 1470 : 1332 : instrumentation = shm_toc_lookup(toc, PARALLEL_KEY_INSTRUMENTATION, true);
3732 rhaas@postgresql.org 1471 [ + + ]: 1332 : if (instrumentation != NULL)
1472 : 363 : instrument_options = instrumentation->instrument_options;
2639 andres@anarazel.de 1473 : 1332 : jit_instrumentation = shm_toc_lookup(toc, PARALLEL_KEY_JIT_INSTRUMENTATION,
1474 : : true);
3732 rhaas@postgresql.org 1475 : 1332 : queryDesc = ExecParallelGetQueryDesc(toc, receiver, instrument_options);
1476 : :
1477 : : /* Setting debug_query_string for individual workers */
3219 1478 : 1332 : debug_query_string = queryDesc->sourceText;
1479 : :
1480 : : /* Report workers' query for monitoring purposes */
1481 : 1332 : pgstat_report_activity(STATE_RUNNING, debug_query_string);
1482 : :
1483 : : /* Attach to the dynamic shared memory area. */
3116 tgl@sss.pgh.pa.us 1484 : 1332 : area_space = shm_toc_lookup(toc, PARALLEL_KEY_DSA, false);
3284 rhaas@postgresql.org 1485 : 1332 : area = dsa_attach_in_place(area_space, seg);
1486 : :
1487 : : /* Start up the executor */
2826 andres@anarazel.de 1488 : 1332 : queryDesc->plannedstmt->jitFlags = fpes->jit_flags;
208 amitlan@postgresql.o 1489 : 1332 : ExecutorStart(queryDesc, fpes->eflags);
1490 : :
1491 : : /* Special executor initialization steps for parallel workers */
3284 rhaas@postgresql.org 1492 : 1332 : queryDesc->planstate->state->es_query_dsa = area;
2952 1493 [ + + ]: 1332 : if (DsaPointerIsValid(fpes->param_exec))
1494 : : {
1495 : : char *paramexec_space;
1496 : :
1497 : 36 : paramexec_space = dsa_get_address(area, fpes->param_exec);
1498 : 36 : RestoreParamExecParams(paramexec_space, queryDesc->estate);
1499 : : }
andres@anarazel.de 1500 : 1332 : pwcxt.toc = toc;
1501 : 1332 : pwcxt.seg = seg;
1502 : 1332 : ExecParallelInitializeWorker(queryDesc->planstate, &pwcxt);
1503 : :
1504 : : /* Pass down any tuple bound */
3031 rhaas@postgresql.org 1505 : 1332 : ExecSetTupleBound(fpes->tuples_needed, queryDesc->planstate);
1506 : :
1507 : : /*
1508 : : * Prepare to track buffer/WAL usage during query execution.
1509 : : *
1510 : : * We do this after starting up the executor to match what happens in the
1511 : : * leader, which also doesn't count buffer accesses and WAL activity that
1512 : : * occur during executor startup.
1513 : : */
2692 akapila@postgresql.o 1514 : 1332 : InstrStartParallelQuery();
1515 : :
1516 : : /*
1517 : : * Run the plan. If we specified a tuple bound, be careful not to demand
1518 : : * more tuples than that.
1519 : : */
3031 rhaas@postgresql.org 1520 : 1332 : ExecutorRun(queryDesc,
1521 : : ForwardScanDirection,
372 tgl@sss.pgh.pa.us 1522 : 1332 : fpes->tuples_needed < 0 ? (int64) 0 : fpes->tuples_needed);
1523 : :
1524 : : /* Shut down the executor */
3732 rhaas@postgresql.org 1525 : 1326 : ExecutorFinish(queryDesc);
1526 : :
1527 : : /* Report buffer/WAL usage during parallel execution. */
3116 tgl@sss.pgh.pa.us 1528 : 1326 : buffer_usage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE, false);
2082 akapila@postgresql.o 1529 : 1326 : wal_usage = shm_toc_lookup(toc, PARALLEL_KEY_WAL_USAGE, false);
1530 : 1326 : InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber],
1531 : 1326 : &wal_usage[ParallelWorkerNumber]);
1532 : :
1533 : : /* Report instrumentation data if any instrumentation options are set. */
3732 rhaas@postgresql.org 1534 [ + + ]: 1326 : if (instrumentation != NULL)
1535 : 363 : ExecParallelReportInstrumentation(queryDesc->planstate,
1536 : : instrumentation);
1537 : :
1538 : : /* Report JIT instrumentation data if any */
2639 andres@anarazel.de 1539 [ - + - - ]: 1326 : if (queryDesc->estate->es_jit && jit_instrumentation != NULL)
1540 : : {
2639 andres@anarazel.de 1541 [ # # ]:UBC 0 : Assert(ParallelWorkerNumber < jit_instrumentation->num_workers);
1542 : 0 : jit_instrumentation->jit_instr[ParallelWorkerNumber] =
1543 : 0 : queryDesc->estate->es_jit->instr;
1544 : : }
1545 : :
1546 : : /* Must do this after capturing instrumentation. */
3730 rhaas@postgresql.org 1547 :CBC 1326 : ExecutorEnd(queryDesc);
1548 : :
1549 : : /* Cleanup. */
3284 1550 : 1326 : dsa_detach(area);
3732 1551 : 1326 : FreeQueryDesc(queryDesc);
3022 peter_e@gmx.net 1552 : 1326 : receiver->rDestroy(receiver);
3732 rhaas@postgresql.org 1553 : 1326 : }
|