Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * execParallel.c
4 : : * Support routines for parallel execution.
5 : : *
6 : : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
7 : : * Portions Copyright (c) 1994, Regents of the University of California
8 : : *
9 : : * This file contains routines that are intended to support setting up,
10 : : * using, and tearing down a ParallelContext from within the PostgreSQL
11 : : * executor. The ParallelContext machinery will handle starting the
12 : : * workers and ensuring that their state generally matches that of the
13 : : * leader; see src/backend/access/transam/README.parallel for details.
14 : : * However, we must save and restore relevant executor state, such as
15 : : * any ParamListInfo associated with the query, buffer/WAL usage info, and
16 : : * the actual plan to be passed down to the worker.
17 : : *
18 : : * IDENTIFICATION
19 : : * src/backend/executor/execParallel.c
20 : : *
21 : : *-------------------------------------------------------------------------
22 : : */
23 : :
24 : : #include "postgres.h"
25 : :
26 : : #include "executor/execParallel.h"
27 : : #include "executor/executor.h"
28 : : #include "executor/nodeAgg.h"
29 : : #include "executor/nodeAppend.h"
30 : : #include "executor/nodeBitmapHeapscan.h"
31 : : #include "executor/nodeBitmapIndexscan.h"
32 : : #include "executor/nodeCustom.h"
33 : : #include "executor/nodeForeignscan.h"
34 : : #include "executor/nodeHash.h"
35 : : #include "executor/nodeHashjoin.h"
36 : : #include "executor/nodeIncrementalSort.h"
37 : : #include "executor/nodeIndexonlyscan.h"
38 : : #include "executor/nodeIndexscan.h"
39 : : #include "executor/nodeMemoize.h"
40 : : #include "executor/nodeSeqscan.h"
41 : : #include "executor/nodeSort.h"
42 : : #include "executor/nodeSubplan.h"
43 : : #include "executor/nodeTidrangescan.h"
44 : : #include "executor/tqueue.h"
45 : : #include "jit/jit.h"
46 : : #include "nodes/nodeFuncs.h"
47 : : #include "pgstat.h"
48 : : #include "storage/proc.h"
49 : : #include "tcop/tcopprot.h"
50 : : #include "utils/datum.h"
51 : : #include "utils/dsa.h"
52 : : #include "utils/lsyscache.h"
53 : : #include "utils/snapmgr.h"
54 : :
55 : : /*
56 : : * Magic numbers for parallel executor communication. We use constants
57 : : * greater than any 32-bit integer here so that values < 2^32 can be used
58 : : * by individual parallel nodes to store their own state.
59 : : */
60 : : #define PARALLEL_KEY_EXECUTOR_FIXED UINT64CONST(0xE000000000000001)
61 : : #define PARALLEL_KEY_PLANNEDSTMT UINT64CONST(0xE000000000000002)
62 : : #define PARALLEL_KEY_PARAMLISTINFO UINT64CONST(0xE000000000000003)
63 : : #define PARALLEL_KEY_BUFFER_USAGE UINT64CONST(0xE000000000000004)
64 : : #define PARALLEL_KEY_TUPLE_QUEUE UINT64CONST(0xE000000000000005)
65 : : #define PARALLEL_KEY_INSTRUMENTATION UINT64CONST(0xE000000000000006)
66 : : #define PARALLEL_KEY_DSA UINT64CONST(0xE000000000000007)
67 : : #define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xE000000000000008)
68 : : #define PARALLEL_KEY_JIT_INSTRUMENTATION UINT64CONST(0xE000000000000009)
69 : : #define PARALLEL_KEY_WAL_USAGE UINT64CONST(0xE00000000000000A)
70 : :
71 : : #define PARALLEL_TUPLE_QUEUE_SIZE 65536
72 : :
73 : : /*
74 : : * Fixed-size random stuff that we need to pass to parallel workers.
75 : : */
76 : : typedef struct FixedParallelExecutorState
77 : : {
78 : : int64 tuples_needed; /* tuple bound, see ExecSetTupleBound */
79 : : dsa_pointer param_exec;
80 : : int eflags;
81 : : int jit_flags;
82 : : } FixedParallelExecutorState;
83 : :
84 : : /*
85 : : * DSM structure for accumulating per-PlanState instrumentation.
86 : : *
87 : : * instrument_options: Same meaning here as in instrument.c.
88 : : *
89 : : * instrument_offset: Offset, relative to the start of this structure,
90 : : * of the first NodeInstrumentation object. This will depend on the length of
91 : : * the plan_node_id array.
92 : : *
93 : : * num_workers: Number of workers.
94 : : *
95 : : * num_plan_nodes: Number of plan nodes.
96 : : *
97 : : * plan_node_id: Array of plan nodes for which we are gathering instrumentation
98 : : * from parallel workers. The length of this array is given by num_plan_nodes.
99 : : */
100 : : struct SharedExecutorInstrumentation
101 : : {
102 : : int instrument_options;
103 : : int instrument_offset;
104 : : int num_workers;
105 : : int num_plan_nodes;
106 : : int plan_node_id[FLEXIBLE_ARRAY_MEMBER];
107 : :
108 : : /*
109 : : * Array of num_plan_nodes * num_workers NodeInstrumentation objects
110 : : * follows.
111 : : */
112 : : };
113 : : #define GetInstrumentationArray(sei) \
114 : : (StaticAssertVariableIsOfTypeMacro(sei, SharedExecutorInstrumentation *), \
115 : : (NodeInstrumentation *) (((char *) sei) + sei->instrument_offset))
116 : :
117 : : /* Context object for ExecParallelEstimate. */
118 : : typedef struct ExecParallelEstimateContext
119 : : {
120 : : ParallelContext *pcxt;
121 : : int nnodes;
122 : : } ExecParallelEstimateContext;
123 : :
124 : : /* Context object for ExecParallelInitializeDSM. */
125 : : typedef struct ExecParallelInitializeDSMContext
126 : : {
127 : : ParallelContext *pcxt;
128 : : SharedExecutorInstrumentation *instrumentation;
129 : : int nnodes;
130 : : } ExecParallelInitializeDSMContext;
131 : :
132 : : /* Helper functions that run in the parallel leader. */
133 : : static char *ExecSerializePlan(Plan *plan, EState *estate);
134 : : static bool ExecParallelEstimate(PlanState *planstate,
135 : : ExecParallelEstimateContext *e);
136 : : static bool ExecParallelInitializeDSM(PlanState *planstate,
137 : : ExecParallelInitializeDSMContext *d);
138 : : static shm_mq_handle **ExecParallelSetupTupleQueues(ParallelContext *pcxt,
139 : : bool reinitialize);
140 : : static bool ExecParallelReInitializeDSM(PlanState *planstate,
141 : : ParallelContext *pcxt);
142 : : static bool ExecParallelRetrieveInstrumentation(PlanState *planstate,
143 : : SharedExecutorInstrumentation *instrumentation);
144 : :
145 : : /* Helper function that runs in the parallel worker. */
146 : : static DestReceiver *ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc);
147 : :
148 : : /*
149 : : * Create a serialized representation of the plan to be sent to each worker.
150 : : */
151 : : static char *
3870 rhaas@postgresql.org 152 :CBC 524 : ExecSerializePlan(Plan *plan, EState *estate)
153 : : {
154 : : PlannedStmt *pstmt;
155 : : ListCell *lc;
156 : :
157 : : /* We can't scribble on the original plan, so make a copy. */
3872 158 : 524 : plan = copyObject(plan);
159 : :
160 : : /*
161 : : * The worker will start its own copy of the executor, and that copy will
162 : : * insert a junk filter if the toplevel node has any resjunk entries. We
163 : : * don't want that to happen, because while resjunk columns shouldn't be
164 : : * sent back to the user, here the tuples are coming back to another
165 : : * backend which may very well need them. So mutate the target list
166 : : * accordingly. This is sort of a hack; there might be better ways to do
167 : : * this...
168 : : */
3310 tgl@sss.pgh.pa.us 169 [ + + + + : 1437 : foreach(lc, plan->targetlist)
+ + ]
170 : : {
171 : 913 : TargetEntry *tle = lfirst_node(TargetEntry, lc);
172 : :
3872 rhaas@postgresql.org 173 : 913 : tle->resjunk = false;
174 : : }
175 : :
176 : : /*
177 : : * Create a dummy PlannedStmt. Most of the fields don't need to be valid
178 : : * for our purposes, but the worker will need at least a minimal
179 : : * PlannedStmt to start the executor.
180 : : */
181 : 524 : pstmt = makeNode(PlannedStmt);
182 : 524 : pstmt->commandType = CMD_SELECT;
1841 bruce@momjian.us 183 : 524 : pstmt->queryId = pgstat_get_my_query_id();
407 michael@paquier.xyz 184 : 524 : pstmt->planId = pgstat_get_my_plan_id();
3581 tgl@sss.pgh.pa.us 185 : 524 : pstmt->hasReturning = false;
186 : 524 : pstmt->hasModifyingCTE = false;
187 : 524 : pstmt->canSetTag = true;
188 : 524 : pstmt->transientPlan = false;
189 : 524 : pstmt->dependsOnRole = false;
190 : 524 : pstmt->parallelModeNeeded = false;
3872 rhaas@postgresql.org 191 : 524 : pstmt->planTree = plan;
460 amitlan@postgresql.o 192 : 524 : pstmt->partPruneInfos = estate->es_part_prune_infos;
3870 rhaas@postgresql.org 193 : 524 : pstmt->rtable = estate->es_range_table;
452 amitlan@postgresql.o 194 : 524 : pstmt->unprunableRelids = estate->es_unpruned_relids;
1246 alvherre@alvh.no-ip. 195 : 524 : pstmt->permInfos = estate->es_rteperminfos;
2337 tgl@sss.pgh.pa.us 196 : 524 : pstmt->appendRelations = NIL;
278 michael@paquier.xyz 197 :GNC 524 : pstmt->planOrigin = PLAN_STMT_INTERNAL;
198 : :
199 : : /*
200 : : * Transfer only parallel-safe subplans, leaving a NULL "hole" in the list
201 : : * for unsafe ones (so that the list indexes of the safe ones are
202 : : * preserved). This positively ensures that the worker won't try to run,
203 : : * or even do ExecInitNode on, an unsafe subplan. That's important to
204 : : * protect, eg, non-parallel-aware FDWs from getting into trouble.
205 : : */
3310 tgl@sss.pgh.pa.us 206 :CBC 524 : pstmt->subplans = NIL;
207 [ + + + + : 560 : foreach(lc, estate->es_plannedstmt->subplans)
+ + ]
208 : : {
209 : 36 : Plan *subplan = (Plan *) lfirst(lc);
210 : :
211 [ + - + + ]: 36 : if (subplan && !subplan->parallel_safe)
212 : 8 : subplan = NULL;
213 : 36 : pstmt->subplans = lappend(pstmt->subplans, subplan);
214 : : }
215 : :
3872 rhaas@postgresql.org 216 : 524 : pstmt->rewindPlanIDs = NULL;
217 : 524 : pstmt->rowMarks = NIL;
218 : :
219 : : /*
220 : : * Pass the row mark and result relation relids to parallel workers. They
221 : : * may need to check them to inform heuristics.
222 : : */
36 melanieplageman@gmai 223 :GNC 524 : pstmt->rowMarkRelids = estate->es_plannedstmt->rowMarkRelids;
224 : 524 : pstmt->resultRelationRelids = estate->es_plannedstmt->resultRelationRelids;
3872 rhaas@postgresql.org 225 :CBC 524 : pstmt->relationOids = NIL;
226 : 524 : pstmt->invalItems = NIL; /* workers can't replan anyway... */
3095 227 : 524 : pstmt->paramExecTypes = estate->es_plannedstmt->paramExecTypes;
3398 tgl@sss.pgh.pa.us 228 : 524 : pstmt->utilityStmt = NULL;
229 : 524 : pstmt->stmt_location = -1;
230 : 524 : pstmt->stmt_len = -1;
231 : :
232 : : /* Return serialized copy of our dummy PlannedStmt. */
3872 rhaas@postgresql.org 233 : 524 : return nodeToString(pstmt);
234 : : }
235 : :
236 : : /*
237 : : * Parallel-aware plan nodes (and occasionally others) may need some state
238 : : * which is shared across all parallel workers. Before we size the DSM, give
239 : : * them a chance to call shm_toc_estimate_chunk or shm_toc_estimate_keys on
240 : : * &pcxt->estimator.
241 : : *
242 : : * While we're at it, count the number of PlanState nodes in the tree, so
243 : : * we know how many Instrumentation structures we need.
244 : : */
245 : : static bool
246 : 3297 : ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e)
247 : : {
248 [ - + ]: 3297 : if (planstate == NULL)
3872 rhaas@postgresql.org 249 :UBC 0 : return false;
250 : :
251 : : /* Count this node. */
3872 rhaas@postgresql.org 252 :CBC 3297 : e->nnodes++;
253 : :
3171 254 [ + + + + : 3297 : switch (nodeTag(planstate))
- + + - +
+ + + - +
+ + ]
255 : : {
256 : 1529 : case T_SeqScanState:
257 [ + + ]: 1529 : if (planstate->plan->parallel_aware)
3758 258 : 1191 : ExecSeqScanEstimate((SeqScanState *) planstate,
259 : : e->pcxt);
260 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
28 tomas.vondra@postgre 261 :GNC 1529 : ExecSeqScanInstrumentEstimate((SeqScanState *) planstate,
262 : : e->pcxt);
3171 rhaas@postgresql.org 263 :CBC 1529 : break;
264 : 276 : case T_IndexScanState:
29 melanieplageman@gmai 265 [ + + ]:GNC 276 : if (planstate->plan->parallel_aware)
266 : 12 : ExecIndexScanEstimate((IndexScanState *) planstate,
267 : : e->pcxt);
268 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
269 : 276 : ExecIndexScanInstrumentEstimate((IndexScanState *) planstate,
270 : : e->pcxt);
3171 rhaas@postgresql.org 271 :CBC 276 : break;
272 : 42 : case T_IndexOnlyScanState:
29 melanieplageman@gmai 273 [ + + ]:GNC 42 : if (planstate->plan->parallel_aware)
274 : 30 : ExecIndexOnlyScanEstimate((IndexOnlyScanState *) planstate,
275 : : e->pcxt);
276 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
277 : 42 : ExecIndexOnlyScanInstrumentEstimate((IndexOnlyScanState *) planstate,
278 : : e->pcxt);
420 pg@bowt.ie 279 :CBC 42 : break;
280 : 13 : case T_BitmapIndexScanState:
281 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
282 : 13 : ExecBitmapIndexScanEstimate((BitmapIndexScanState *) planstate,
283 : : e->pcxt);
3171 rhaas@postgresql.org 284 : 13 : break;
3171 rhaas@postgresql.org 285 :UBC 0 : case T_ForeignScanState:
286 [ # # ]: 0 : if (planstate->plan->parallel_aware)
3744 287 : 0 : ExecForeignScanEstimate((ForeignScanState *) planstate,
288 : : e->pcxt);
3171 289 : 0 : break;
159 drowley@postgresql.o 290 :GNC 16 : case T_TidRangeScanState:
291 [ + - ]: 16 : if (planstate->plan->parallel_aware)
292 : 16 : ExecTidRangeScanEstimate((TidRangeScanState *) planstate,
293 : : e->pcxt);
294 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
28 tomas.vondra@postgre 295 : 16 : ExecTidRangeScanInstrumentEstimate((TidRangeScanState *) planstate,
296 : : e->pcxt);
159 drowley@postgresql.o 297 : 16 : break;
3073 rhaas@postgresql.org 298 :CBC 148 : case T_AppendState:
299 [ + + ]: 148 : if (planstate->plan->parallel_aware)
300 : 108 : ExecAppendEstimate((AppendState *) planstate,
301 : : e->pcxt);
302 : 148 : break;
3171 rhaas@postgresql.org 303 :UBC 0 : case T_CustomScanState:
304 [ # # ]: 0 : if (planstate->plan->parallel_aware)
3744 305 : 0 : ExecCustomScanEstimate((CustomScanState *) planstate,
306 : : e->pcxt);
3171 307 : 0 : break;
3171 rhaas@postgresql.org 308 :CBC 13 : case T_BitmapHeapScanState:
309 [ + + ]: 13 : if (planstate->plan->parallel_aware)
3345 310 : 12 : ExecBitmapHeapEstimate((BitmapHeapScanState *) planstate,
311 : : e->pcxt);
312 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
28 tomas.vondra@postgre 313 :GNC 13 : ExecBitmapHeapInstrumentEstimate((BitmapHeapScanState *) planstate,
314 : : e->pcxt);
3171 rhaas@postgresql.org 315 :CBC 13 : break;
3058 andres@anarazel.de 316 : 208 : case T_HashJoinState:
317 [ + + ]: 208 : if (planstate->plan->parallel_aware)
318 : 84 : ExecHashJoinEstimate((HashJoinState *) planstate,
319 : : e->pcxt);
320 : 208 : break;
3073 321 : 208 : case T_HashState:
322 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
323 : 208 : ExecHashEstimate((HashState *) planstate, e->pcxt);
324 : 208 : break;
3171 rhaas@postgresql.org 325 : 197 : case T_SortState:
326 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
327 : 197 : ExecSortEstimate((SortState *) planstate, e->pcxt);
3170 tgl@sss.pgh.pa.us 328 : 197 : break;
2220 tomas.vondra@postgre 329 :UBC 0 : case T_IncrementalSortState:
330 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
331 : 0 : ExecIncrementalSortEstimate((IncrementalSortState *) planstate, e->pcxt);
332 : 0 : break;
2146 drowley@postgresql.o 333 :CBC 393 : case T_AggState:
334 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
335 : 393 : ExecAggEstimate((AggState *) planstate, e->pcxt);
336 : 393 : break;
1756 337 : 4 : case T_MemoizeState:
338 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
339 : 4 : ExecMemoizeEstimate((MemoizeState *) planstate, e->pcxt);
1859 340 : 4 : break;
3171 rhaas@postgresql.org 341 : 250 : default:
342 : 250 : break;
343 : : }
344 : :
3872 345 : 3297 : return planstate_tree_walker(planstate, ExecParallelEstimate, e);
346 : : }
347 : :
348 : : /*
349 : : * Estimate the amount of space required to serialize the indicated parameters.
350 : : */
351 : : static Size
3092 352 : 16 : EstimateParamExecSpace(EState *estate, Bitmapset *params)
353 : : {
354 : : int paramid;
355 : 16 : Size sz = sizeof(int);
356 : :
357 : 16 : paramid = -1;
358 [ + + ]: 36 : while ((paramid = bms_next_member(params, paramid)) >= 0)
359 : : {
360 : : Oid typeOid;
361 : : int16 typLen;
362 : : bool typByVal;
363 : : ParamExecData *prm;
364 : :
365 : 20 : prm = &(estate->es_param_exec_vals[paramid]);
366 : 20 : typeOid = list_nth_oid(estate->es_plannedstmt->paramExecTypes,
367 : : paramid);
368 : :
369 : 20 : sz = add_size(sz, sizeof(int)); /* space for paramid */
370 : :
371 : : /* space for datum/isnull */
372 [ + - ]: 20 : if (OidIsValid(typeOid))
373 : 20 : get_typlenbyval(typeOid, &typLen, &typByVal);
374 : : else
375 : : {
376 : : /* If no type OID, assume by-value, like copyParamList does. */
3092 rhaas@postgresql.org 377 :UBC 0 : typLen = sizeof(Datum);
378 : 0 : typByVal = true;
379 : : }
3092 rhaas@postgresql.org 380 :CBC 20 : sz = add_size(sz,
381 : 20 : datumEstimateSpace(prm->value, prm->isnull,
382 : : typByVal, typLen));
383 : : }
384 : 16 : return sz;
385 : : }
386 : :
387 : : /*
388 : : * Serialize specified PARAM_EXEC parameters.
389 : : *
390 : : * We write the number of parameters first, as a 4-byte integer, and then
391 : : * write details for each parameter in turn. The details for each parameter
392 : : * consist of a 4-byte paramid (location of param in execution time internal
393 : : * parameter array) and then the datum as serialized by datumSerialize().
394 : : */
395 : : static dsa_pointer
3060 396 : 16 : SerializeParamExecParams(EState *estate, Bitmapset *params, dsa_area *area)
397 : : {
398 : : Size size;
399 : : int nparams;
400 : : int paramid;
401 : : ParamExecData *prm;
402 : : dsa_pointer handle;
403 : : char *start_address;
404 : :
405 : : /* Allocate enough space for the current parameter values. */
3092 406 : 16 : size = EstimateParamExecSpace(estate, params);
3060 407 : 16 : handle = dsa_allocate(area, size);
408 : 16 : start_address = dsa_get_address(area, handle);
409 : :
410 : : /* First write the number of parameters as a 4-byte integer. */
3092 411 : 16 : nparams = bms_num_members(params);
412 : 16 : memcpy(start_address, &nparams, sizeof(int));
413 : 16 : start_address += sizeof(int);
414 : :
415 : : /* Write details for each parameter in turn. */
416 : 16 : paramid = -1;
417 [ + + ]: 36 : while ((paramid = bms_next_member(params, paramid)) >= 0)
418 : : {
419 : : Oid typeOid;
420 : : int16 typLen;
421 : : bool typByVal;
422 : :
423 : 20 : prm = &(estate->es_param_exec_vals[paramid]);
424 : 20 : typeOid = list_nth_oid(estate->es_plannedstmt->paramExecTypes,
425 : : paramid);
426 : :
427 : : /* Write paramid. */
428 : 20 : memcpy(start_address, ¶mid, sizeof(int));
429 : 20 : start_address += sizeof(int);
430 : :
431 : : /* Write datum/isnull */
432 [ + - ]: 20 : if (OidIsValid(typeOid))
433 : 20 : get_typlenbyval(typeOid, &typLen, &typByVal);
434 : : else
435 : : {
436 : : /* If no type OID, assume by-value, like copyParamList does. */
3092 rhaas@postgresql.org 437 :UBC 0 : typLen = sizeof(Datum);
438 : 0 : typByVal = true;
439 : : }
3092 rhaas@postgresql.org 440 :CBC 20 : datumSerialize(prm->value, prm->isnull, typByVal, typLen,
441 : : &start_address);
442 : : }
443 : :
444 : 16 : return handle;
445 : : }
446 : :
447 : : /*
448 : : * Restore specified PARAM_EXEC parameters.
449 : : */
450 : : static void
451 : 48 : RestoreParamExecParams(char *start_address, EState *estate)
452 : : {
453 : : int nparams;
454 : : int i;
455 : : int paramid;
456 : :
457 : 48 : memcpy(&nparams, start_address, sizeof(int));
458 : 48 : start_address += sizeof(int);
459 : :
460 [ + + ]: 104 : for (i = 0; i < nparams; i++)
461 : : {
462 : : ParamExecData *prm;
463 : :
464 : : /* Read paramid */
465 : 56 : memcpy(¶mid, start_address, sizeof(int));
466 : 56 : start_address += sizeof(int);
467 : 56 : prm = &(estate->es_param_exec_vals[paramid]);
468 : :
469 : : /* Read datum/isnull. */
470 : 56 : prm->value = datumRestore(&start_address, &prm->isnull);
471 : 56 : prm->execPlan = NULL;
472 : : }
473 : 48 : }
474 : :
475 : : /*
476 : : * Initialize the dynamic shared memory segment that will be used to control
477 : : * parallel execution.
478 : : */
479 : : static bool
3872 480 : 3297 : ExecParallelInitializeDSM(PlanState *planstate,
481 : : ExecParallelInitializeDSMContext *d)
482 : : {
483 [ - + ]: 3297 : if (planstate == NULL)
3872 rhaas@postgresql.org 484 :UBC 0 : return false;
485 : :
486 : : /* If instrumentation is enabled, initialize slot for this node. */
3872 rhaas@postgresql.org 487 [ + + ]:CBC 3297 : if (d->instrumentation != NULL)
3800 488 : 684 : d->instrumentation->plan_node_id[d->nnodes] =
489 : 684 : planstate->plan->plan_node_id;
490 : :
491 : : /* Count this node. */
3872 492 : 3297 : d->nnodes++;
493 : :
494 : : /*
495 : : * Call initializers for DSM-using plan nodes.
496 : : *
497 : : * Most plan nodes won't do anything here, but plan nodes that allocated
498 : : * DSM may need to initialize shared state in the DSM before parallel
499 : : * workers are launched. They can allocate the space they previously
500 : : * estimated using shm_toc_allocate, and add the keys they previously
501 : : * estimated using shm_toc_insert, in each case targeting pcxt->toc.
502 : : */
3171 503 [ + + + + : 3297 : switch (nodeTag(planstate))
- + + - +
+ + + - +
+ + ]
504 : : {
505 : 1529 : case T_SeqScanState:
506 [ + + ]: 1529 : if (planstate->plan->parallel_aware)
3758 507 : 1191 : ExecSeqScanInitializeDSM((SeqScanState *) planstate,
508 : : d->pcxt);
509 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
28 tomas.vondra@postgre 510 :GNC 1529 : ExecSeqScanInstrumentInitDSM((SeqScanState *) planstate,
511 : : d->pcxt);
3171 rhaas@postgresql.org 512 :CBC 1529 : break;
513 : 276 : case T_IndexScanState:
29 melanieplageman@gmai 514 [ + + ]:GNC 276 : if (planstate->plan->parallel_aware)
515 : 12 : ExecIndexScanInitializeDSM((IndexScanState *) planstate,
516 : : d->pcxt);
517 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
518 : 276 : ExecIndexScanInstrumentInitDSM((IndexScanState *) planstate,
519 : : d->pcxt);
3171 rhaas@postgresql.org 520 :CBC 276 : break;
521 : 42 : case T_IndexOnlyScanState:
29 melanieplageman@gmai 522 [ + + ]:GNC 42 : if (planstate->plan->parallel_aware)
523 : 30 : ExecIndexOnlyScanInitializeDSM((IndexOnlyScanState *) planstate,
524 : : d->pcxt);
525 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
526 : 42 : ExecIndexOnlyScanInstrumentInitDSM((IndexOnlyScanState *) planstate,
527 : : d->pcxt);
420 pg@bowt.ie 528 :CBC 42 : break;
529 : 13 : case T_BitmapIndexScanState:
530 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
531 : 13 : ExecBitmapIndexScanInitializeDSM((BitmapIndexScanState *) planstate, d->pcxt);
3171 rhaas@postgresql.org 532 : 13 : break;
3171 rhaas@postgresql.org 533 :UBC 0 : case T_ForeignScanState:
534 [ # # ]: 0 : if (planstate->plan->parallel_aware)
3744 535 : 0 : ExecForeignScanInitializeDSM((ForeignScanState *) planstate,
536 : : d->pcxt);
3171 537 : 0 : break;
159 drowley@postgresql.o 538 :GNC 16 : case T_TidRangeScanState:
539 [ + - ]: 16 : if (planstate->plan->parallel_aware)
540 : 16 : ExecTidRangeScanInitializeDSM((TidRangeScanState *) planstate,
541 : : d->pcxt);
542 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
28 tomas.vondra@postgre 543 : 16 : ExecTidRangeScanInstrumentInitDSM((TidRangeScanState *) planstate,
544 : : d->pcxt);
159 drowley@postgresql.o 545 : 16 : break;
3073 rhaas@postgresql.org 546 :CBC 148 : case T_AppendState:
547 [ + + ]: 148 : if (planstate->plan->parallel_aware)
548 : 108 : ExecAppendInitializeDSM((AppendState *) planstate,
549 : : d->pcxt);
550 : 148 : break;
3171 rhaas@postgresql.org 551 :UBC 0 : case T_CustomScanState:
552 [ # # ]: 0 : if (planstate->plan->parallel_aware)
3744 553 : 0 : ExecCustomScanInitializeDSM((CustomScanState *) planstate,
554 : : d->pcxt);
3171 555 : 0 : break;
3171 rhaas@postgresql.org 556 :CBC 13 : case T_BitmapHeapScanState:
557 [ + + ]: 13 : if (planstate->plan->parallel_aware)
3345 558 : 12 : ExecBitmapHeapInitializeDSM((BitmapHeapScanState *) planstate,
559 : : d->pcxt);
560 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
28 tomas.vondra@postgre 561 :GNC 13 : ExecBitmapHeapInstrumentInitDSM((BitmapHeapScanState *) planstate,
562 : : d->pcxt);
3171 rhaas@postgresql.org 563 :CBC 13 : break;
3058 andres@anarazel.de 564 : 208 : case T_HashJoinState:
565 [ + + ]: 208 : if (planstate->plan->parallel_aware)
566 : 84 : ExecHashJoinInitializeDSM((HashJoinState *) planstate,
567 : : d->pcxt);
568 : 208 : break;
3073 569 : 208 : case T_HashState:
570 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
571 : 208 : ExecHashInitializeDSM((HashState *) planstate, d->pcxt);
572 : 208 : break;
3171 rhaas@postgresql.org 573 : 197 : case T_SortState:
574 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
575 : 197 : ExecSortInitializeDSM((SortState *) planstate, d->pcxt);
3170 tgl@sss.pgh.pa.us 576 : 197 : break;
2220 tomas.vondra@postgre 577 :UBC 0 : case T_IncrementalSortState:
578 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
579 : 0 : ExecIncrementalSortInitializeDSM((IncrementalSortState *) planstate, d->pcxt);
580 : 0 : break;
2146 drowley@postgresql.o 581 :CBC 393 : case T_AggState:
582 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
583 : 393 : ExecAggInitializeDSM((AggState *) planstate, d->pcxt);
584 : 393 : break;
1756 585 : 4 : case T_MemoizeState:
586 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
587 : 4 : ExecMemoizeInitializeDSM((MemoizeState *) planstate, d->pcxt);
1859 588 : 4 : break;
3171 rhaas@postgresql.org 589 : 250 : default:
590 : 250 : break;
591 : : }
592 : :
3872 593 : 3297 : return planstate_tree_walker(planstate, ExecParallelInitializeDSM, d);
594 : : }
595 : :
596 : : /*
597 : : * It sets up the response queues for backend workers to return tuples
598 : : * to the main backend and start the workers.
599 : : */
600 : : static shm_mq_handle **
3840 601 : 696 : ExecParallelSetupTupleQueues(ParallelContext *pcxt, bool reinitialize)
602 : : {
603 : : shm_mq_handle **responseq;
604 : : char *tqueuespace;
605 : : int i;
606 : :
607 : : /* Skip this if no workers. */
3872 608 [ - + ]: 696 : if (pcxt->nworkers == 0)
3872 rhaas@postgresql.org 609 :UBC 0 : return NULL;
610 : :
611 : : /* Allocate memory for shared memory queue handles. */
612 : : responseq = (shm_mq_handle **)
3872 rhaas@postgresql.org 613 :CBC 696 : palloc(pcxt->nworkers * sizeof(shm_mq_handle *));
614 : :
615 : : /*
616 : : * If not reinitializing, allocate space from the DSM for the queues;
617 : : * otherwise, find the already allocated space.
618 : : */
3840 619 [ + + ]: 696 : if (!reinitialize)
620 : : tqueuespace =
621 : 524 : shm_toc_allocate(pcxt->toc,
622 : : mul_size(PARALLEL_TUPLE_QUEUE_SIZE,
3651 623 : 524 : pcxt->nworkers));
624 : : else
3256 tgl@sss.pgh.pa.us 625 : 172 : tqueuespace = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE, false);
626 : :
627 : : /* Create the queues, and become the receiver for each. */
3872 rhaas@postgresql.org 628 [ + + ]: 2567 : for (i = 0; i < pcxt->nworkers; ++i)
629 : : {
630 : : shm_mq *mq;
631 : :
3651 632 : 1871 : mq = shm_mq_create(tqueuespace +
633 : 1871 : ((Size) i) * PARALLEL_TUPLE_QUEUE_SIZE,
634 : : (Size) PARALLEL_TUPLE_QUEUE_SIZE);
635 : :
3872 636 : 1871 : shm_mq_set_receiver(mq, MyProc);
637 : 1871 : responseq[i] = shm_mq_attach(mq, pcxt->seg, NULL);
638 : : }
639 : :
640 : : /* Add array of queues to shm_toc, so others can find it. */
3840 641 [ + + ]: 696 : if (!reinitialize)
642 : 524 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE, tqueuespace);
643 : :
644 : : /* Return array of handles. */
3872 645 : 696 : return responseq;
646 : : }
647 : :
648 : : /*
649 : : * Sets up the required infrastructure for backend workers to perform
650 : : * execution and return results to the main backend.
651 : : */
652 : : ParallelExecutorInfo *
3092 653 : 524 : ExecInitParallelPlan(PlanState *planstate, EState *estate,
654 : : Bitmapset *sendParams, int nworkers,
655 : : int64 tuples_needed)
656 : : {
657 : : ParallelExecutorInfo *pei;
658 : : ParallelContext *pcxt;
659 : : ExecParallelEstimateContext e;
660 : : ExecParallelInitializeDSMContext d;
661 : : FixedParallelExecutorState *fpes;
662 : : char *pstmt_data;
663 : : char *pstmt_space;
664 : : char *paramlistinfo_space;
665 : : BufferUsage *bufusage_space;
666 : : WalUsage *walusage_space;
3872 667 : 524 : SharedExecutorInstrumentation *instrumentation = NULL;
2779 andres@anarazel.de 668 : 524 : SharedJitInstrumentation *jit_instrumentation = NULL;
669 : : int pstmt_len;
670 : : int paramlistinfo_len;
3872 rhaas@postgresql.org 671 : 524 : int instrumentation_len = 0;
2779 andres@anarazel.de 672 : 524 : int jit_instrumentation_len = 0;
3800 rhaas@postgresql.org 673 : 524 : int instrument_offset = 0;
3424 674 : 524 : Size dsa_minsize = dsa_minimum_size();
675 : : char *query_string;
676 : : int query_len;
677 : :
678 : : /*
679 : : * Force any initplan outputs that we're going to pass to workers to be
680 : : * evaluated, if they weren't already.
681 : : *
682 : : * For simplicity, we use the EState's per-output-tuple ExprContext here.
683 : : * That risks intra-query memory leakage, since we might pass through here
684 : : * many times before that ExprContext gets reset; but ExecSetParamPlan
685 : : * doesn't normally leak any memory in the context (see its comments), so
686 : : * it doesn't seem worth complicating this function's API to pass it a
687 : : * shorter-lived ExprContext. This might need to change someday.
688 : : */
2789 tgl@sss.pgh.pa.us 689 [ + + ]: 524 : ExecSetParamPlanMulti(sendParams, GetPerTupleExprContext(estate));
690 : :
691 : : /* Allocate object for return value. */
146 michael@paquier.xyz 692 :GNC 524 : pei = palloc0_object(ParallelExecutorInfo);
3821 rhaas@postgresql.org 693 :CBC 524 : pei->finished = false;
3872 694 : 524 : pei->planstate = planstate;
695 : :
696 : : /* Fix up and serialize plan to be sent to workers. */
3870 697 : 524 : pstmt_data = ExecSerializePlan(planstate->plan, estate);
698 : :
699 : : /* Create a parallel context. */
2608 tmunro@postgresql.or 700 : 524 : pcxt = CreateParallelContext("postgres", "ParallelQueryMain", nworkers);
3872 rhaas@postgresql.org 701 : 524 : pei->pcxt = pcxt;
702 : :
703 : : /*
704 : : * Before telling the parallel context to create a dynamic shared memory
705 : : * segment, we need to figure out how big it should be. Estimate space
706 : : * for the various things we need to store.
707 : : */
708 : :
709 : : /* Estimate space for fixed-size state. */
3171 710 : 524 : shm_toc_estimate_chunk(&pcxt->estimator,
711 : : sizeof(FixedParallelExecutorState));
712 : 524 : shm_toc_estimate_keys(&pcxt->estimator, 1);
713 : :
714 : : /* Estimate space for query text. */
1846 tgl@sss.pgh.pa.us 715 : 524 : query_len = strlen(estate->es_sourceText);
3058 rhaas@postgresql.org 716 : 524 : shm_toc_estimate_chunk(&pcxt->estimator, query_len + 1);
3359 717 : 524 : shm_toc_estimate_keys(&pcxt->estimator, 1);
718 : :
719 : : /* Estimate space for serialized PlannedStmt. */
3872 720 : 524 : pstmt_len = strlen(pstmt_data) + 1;
721 : 524 : shm_toc_estimate_chunk(&pcxt->estimator, pstmt_len);
722 : 524 : shm_toc_estimate_keys(&pcxt->estimator, 1);
723 : :
724 : : /* Estimate space for serialized ParamListInfo. */
3092 725 : 524 : paramlistinfo_len = EstimateParamListSpace(estate->es_param_list_info);
726 : 524 : shm_toc_estimate_chunk(&pcxt->estimator, paramlistinfo_len);
3872 727 : 524 : shm_toc_estimate_keys(&pcxt->estimator, 1);
728 : :
729 : : /*
730 : : * Estimate space for BufferUsage.
731 : : *
732 : : * If EXPLAIN is not in use and there are no extensions loaded that care,
733 : : * we could skip this. But we have no way of knowing whether anyone's
734 : : * looking at pgBufferUsage, so do it unconditionally.
735 : : */
736 : 524 : shm_toc_estimate_chunk(&pcxt->estimator,
737 : : mul_size(sizeof(BufferUsage), pcxt->nworkers));
738 : 524 : shm_toc_estimate_keys(&pcxt->estimator, 1);
739 : :
740 : : /*
741 : : * Same thing for WalUsage.
742 : : */
2222 akapila@postgresql.o 743 : 524 : shm_toc_estimate_chunk(&pcxt->estimator,
744 : : mul_size(sizeof(WalUsage), pcxt->nworkers));
745 : 524 : shm_toc_estimate_keys(&pcxt->estimator, 1);
746 : :
747 : : /* Estimate space for tuple queues. */
3872 rhaas@postgresql.org 748 : 524 : shm_toc_estimate_chunk(&pcxt->estimator,
749 : : mul_size(PARALLEL_TUPLE_QUEUE_SIZE, pcxt->nworkers));
750 : 524 : shm_toc_estimate_keys(&pcxt->estimator, 1);
751 : :
752 : : /*
753 : : * Give parallel-aware nodes a chance to add to the estimates, and get a
754 : : * count of how many PlanState nodes there are.
755 : : */
756 : 524 : e.pcxt = pcxt;
757 : 524 : e.nnodes = 0;
758 : 524 : ExecParallelEstimate(planstate, &e);
759 : :
760 : : /* Estimate space for instrumentation, if required. */
761 [ + + ]: 524 : if (estate->es_instrument)
762 : : {
763 : 120 : instrumentation_len =
764 : : offsetof(SharedExecutorInstrumentation, plan_node_id) +
3654 765 : 120 : sizeof(int) * e.nnodes;
3800 766 : 120 : instrumentation_len = MAXALIGN(instrumentation_len);
767 : 120 : instrument_offset = instrumentation_len;
3651 768 : 120 : instrumentation_len +=
30 andres@anarazel.de 769 :GNC 120 : mul_size(sizeof(NodeInstrumentation),
3651 rhaas@postgresql.org 770 :CBC 120 : mul_size(e.nnodes, nworkers));
3872 771 : 120 : shm_toc_estimate_chunk(&pcxt->estimator, instrumentation_len);
772 : 120 : shm_toc_estimate_keys(&pcxt->estimator, 1);
773 : :
774 : : /* Estimate space for JIT instrumentation, if required. */
2779 andres@anarazel.de 775 [ - + ]: 120 : if (estate->es_jit_flags != PGJIT_NONE)
776 : : {
2779 andres@anarazel.de 777 :LBC (12) : jit_instrumentation_len =
778 : (12) : offsetof(SharedJitInstrumentation, jit_instr) +
779 : : sizeof(JitInstrumentation) * nworkers;
780 : (12) : shm_toc_estimate_chunk(&pcxt->estimator, jit_instrumentation_len);
781 : (12) : shm_toc_estimate_keys(&pcxt->estimator, 1);
782 : : }
783 : : }
784 : :
785 : : /* Estimate space for DSA area. */
3424 rhaas@postgresql.org 786 :CBC 524 : shm_toc_estimate_chunk(&pcxt->estimator, dsa_minsize);
787 : 524 : shm_toc_estimate_keys(&pcxt->estimator, 1);
788 : :
789 : : /*
790 : : * InitializeParallelDSM() passes the active snapshot to the parallel
791 : : * worker, which uses it to set es_snapshot. Make sure we don't set
792 : : * es_snapshot differently in the child.
793 : : */
782 heikki.linnakangas@i 794 [ - + ]: 524 : Assert(GetActiveSnapshot() == estate->es_snapshot);
795 : :
796 : : /* Everyone's had a chance to ask for space, so now create the DSM. */
3872 rhaas@postgresql.org 797 : 524 : InitializeParallelDSM(pcxt);
798 : :
799 : : /*
800 : : * OK, now we have a dynamic shared memory segment, and it should be big
801 : : * enough to store all of the data we estimated we would want to put into
802 : : * it, plus whatever general stuff (not specifically executor-related) the
803 : : * ParallelContext itself needs to store there. None of the space we
804 : : * asked for has been allocated or initialized yet, though, so do that.
805 : : */
806 : :
807 : : /* Store fixed-size state. */
3171 808 : 524 : fpes = shm_toc_allocate(pcxt->toc, sizeof(FixedParallelExecutorState));
809 : 524 : fpes->tuples_needed = tuples_needed;
3092 810 : 524 : fpes->param_exec = InvalidDsaPointer;
3088 811 : 524 : fpes->eflags = estate->es_top_eflags;
2966 andres@anarazel.de 812 : 524 : fpes->jit_flags = estate->es_jit_flags;
3171 rhaas@postgresql.org 813 : 524 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_EXECUTOR_FIXED, fpes);
814 : :
815 : : /* Store query string */
3058 816 : 524 : query_string = shm_toc_allocate(pcxt->toc, query_len + 1);
1846 tgl@sss.pgh.pa.us 817 : 524 : memcpy(query_string, estate->es_sourceText, query_len + 1);
3359 rhaas@postgresql.org 818 : 524 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_QUERY_TEXT, query_string);
819 : :
820 : : /* Store serialized PlannedStmt. */
3872 821 : 524 : pstmt_space = shm_toc_allocate(pcxt->toc, pstmt_len);
822 : 524 : memcpy(pstmt_space, pstmt_data, pstmt_len);
823 : 524 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_PLANNEDSTMT, pstmt_space);
824 : :
825 : : /* Store serialized ParamListInfo. */
3092 826 : 524 : paramlistinfo_space = shm_toc_allocate(pcxt->toc, paramlistinfo_len);
827 : 524 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_PARAMLISTINFO, paramlistinfo_space);
828 : 524 : SerializeParamList(estate->es_param_list_info, ¶mlistinfo_space);
829 : :
830 : : /* Allocate space for each worker's BufferUsage; no need to initialize. */
3872 831 : 524 : bufusage_space = shm_toc_allocate(pcxt->toc,
3240 tgl@sss.pgh.pa.us 832 : 524 : mul_size(sizeof(BufferUsage), pcxt->nworkers));
3872 rhaas@postgresql.org 833 : 524 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_BUFFER_USAGE, bufusage_space);
834 : 524 : pei->buffer_usage = bufusage_space;
835 : :
836 : : /* Same for WalUsage. */
2222 akapila@postgresql.o 837 : 524 : walusage_space = shm_toc_allocate(pcxt->toc,
838 : 524 : mul_size(sizeof(WalUsage), pcxt->nworkers));
839 : 524 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_WAL_USAGE, walusage_space);
840 : 524 : pei->wal_usage = walusage_space;
841 : :
842 : : /* Set up the tuple queues that the workers will write into. */
3840 rhaas@postgresql.org 843 : 524 : pei->tqueue = ExecParallelSetupTupleQueues(pcxt, false);
844 : :
845 : : /* We don't need the TupleQueueReaders yet, though. */
3168 tgl@sss.pgh.pa.us 846 : 524 : pei->reader = NULL;
847 : :
848 : : /*
849 : : * If instrumentation options were supplied, allocate space for the data.
850 : : * It only gets partially initialized here; the rest happens during
851 : : * ExecParallelInitializeDSM.
852 : : */
3872 rhaas@postgresql.org 853 [ + + ]: 524 : if (estate->es_instrument)
854 : : {
855 : : NodeInstrumentation *instrument;
856 : : int i;
857 : :
858 : 120 : instrumentation = shm_toc_allocate(pcxt->toc, instrumentation_len);
859 : 120 : instrumentation->instrument_options = estate->es_instrument;
3800 860 : 120 : instrumentation->instrument_offset = instrument_offset;
861 : 120 : instrumentation->num_workers = nworkers;
862 : 120 : instrumentation->num_plan_nodes = e.nnodes;
863 : 120 : instrument = GetInstrumentationArray(instrumentation);
864 [ + + ]: 1240 : for (i = 0; i < nworkers * e.nnodes; ++i)
30 andres@anarazel.de 865 :GNC 1120 : InstrInitNode(&instrument[i], estate->es_instrument, false);
3872 rhaas@postgresql.org 866 :CBC 120 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_INSTRUMENTATION,
867 : : instrumentation);
868 : 120 : pei->instrumentation = instrumentation;
869 : :
2779 andres@anarazel.de 870 [ - + ]: 120 : if (estate->es_jit_flags != PGJIT_NONE)
871 : : {
2779 andres@anarazel.de 872 :LBC (12) : jit_instrumentation = shm_toc_allocate(pcxt->toc,
873 : : jit_instrumentation_len);
874 : (12) : jit_instrumentation->num_workers = nworkers;
875 : (12) : memset(jit_instrumentation->jit_instr, 0,
876 : : sizeof(JitInstrumentation) * nworkers);
877 : (12) : shm_toc_insert(pcxt->toc, PARALLEL_KEY_JIT_INSTRUMENTATION,
878 : : jit_instrumentation);
879 : (12) : pei->jit_instrumentation = jit_instrumentation;
880 : : }
881 : : }
882 : :
883 : : /*
884 : : * Create a DSA area that can be used by the leader and all workers.
885 : : * (However, if we failed to create a DSM and are using private memory
886 : : * instead, then skip this.)
887 : : */
3424 rhaas@postgresql.org 888 [ + - ]:CBC 524 : if (pcxt->seg != NULL)
889 : : {
890 : : char *area_space;
891 : :
892 : 524 : area_space = shm_toc_allocate(pcxt->toc, dsa_minsize);
893 : 524 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_DSA, area_space);
894 : 524 : pei->area = dsa_create_in_place(area_space, dsa_minsize,
895 : : LWTRANCHE_PARALLEL_QUERY_DSA,
896 : : pcxt->seg);
897 : :
898 : : /*
899 : : * Serialize parameters, if any, using DSA storage. We don't dare use
900 : : * the main parallel query DSM for this because we might relaunch
901 : : * workers after the values have changed (and thus the amount of
902 : : * storage required has changed).
903 : : */
3092 904 [ + + ]: 524 : if (!bms_is_empty(sendParams))
905 : : {
3060 906 : 16 : pei->param_exec = SerializeParamExecParams(estate, sendParams,
907 : : pei->area);
3092 908 : 16 : fpes->param_exec = pei->param_exec;
909 : : }
910 : : }
911 : :
912 : : /*
913 : : * Give parallel-aware nodes a chance to initialize their shared data.
914 : : * This also initializes the elements of instrumentation->ps_instrument,
915 : : * if it exists.
916 : : */
3872 917 : 524 : d.pcxt = pcxt;
918 : 524 : d.instrumentation = instrumentation;
919 : 524 : d.nnodes = 0;
920 : :
921 : : /* Install our DSA area while initializing the plan. */
3060 922 : 524 : estate->es_query_dsa = pei->area;
3872 923 : 524 : ExecParallelInitializeDSM(planstate, &d);
3060 924 : 524 : estate->es_query_dsa = NULL;
925 : :
926 : : /*
927 : : * Make sure that the world hasn't shifted under our feet. This could
928 : : * probably just be an Assert(), but let's be conservative for now.
929 : : */
3872 930 [ - + ]: 524 : if (e.nnodes != d.nnodes)
3872 rhaas@postgresql.org 931 [ # # ]:UBC 0 : elog(ERROR, "inconsistent count of PlanState nodes");
932 : :
933 : : /* OK, we're ready to rock and roll. */
3872 rhaas@postgresql.org 934 :CBC 524 : return pei;
935 : : }
936 : :
937 : : /*
938 : : * Set up tuple queue readers to read the results of a parallel subplan.
939 : : *
940 : : * This is separate from ExecInitParallelPlan() because we can launch the
941 : : * worker processes and let them start doing something before we do this.
942 : : */
943 : : void
3155 andres@anarazel.de 944 : 683 : ExecParallelCreateReaders(ParallelExecutorInfo *pei)
945 : : {
3168 tgl@sss.pgh.pa.us 946 : 683 : int nworkers = pei->pcxt->nworkers_launched;
947 : : int i;
948 : :
949 [ - + ]: 683 : Assert(pei->reader == NULL);
950 : :
951 [ + - ]: 683 : if (nworkers > 0)
952 : : {
953 : 683 : pei->reader = (TupleQueueReader **)
954 : 683 : palloc(nworkers * sizeof(TupleQueueReader *));
955 : :
956 [ + + ]: 2499 : for (i = 0; i < nworkers; i++)
957 : : {
958 : 1816 : shm_mq_set_handle(pei->tqueue[i],
959 : 1816 : pei->pcxt->worker[i].bgwhandle);
3155 andres@anarazel.de 960 : 1816 : pei->reader[i] = CreateTupleQueueReader(pei->tqueue[i]);
961 : : }
962 : : }
3168 tgl@sss.pgh.pa.us 963 : 683 : }
964 : :
965 : : /*
966 : : * Re-initialize the parallel executor shared memory state before launching
967 : : * a fresh batch of workers.
968 : : */
969 : : void
3170 970 : 172 : ExecParallelReinitialize(PlanState *planstate,
971 : : ParallelExecutorInfo *pei,
972 : : Bitmapset *sendParams)
973 : : {
3092 rhaas@postgresql.org 974 : 172 : EState *estate = planstate->state;
975 : : FixedParallelExecutorState *fpes;
976 : :
977 : : /* Old workers must already be shut down */
3170 tgl@sss.pgh.pa.us 978 [ - + ]: 172 : Assert(pei->finished);
979 : :
980 : : /*
981 : : * Force any initplan outputs that we're going to pass to workers to be
982 : : * evaluated, if they weren't already (see comments in
983 : : * ExecInitParallelPlan).
984 : : */
2789 985 [ + - ]: 172 : ExecSetParamPlanMulti(sendParams, GetPerTupleExprContext(estate));
986 : :
3170 987 : 172 : ReinitializeParallelDSM(pei->pcxt);
988 : 172 : pei->tqueue = ExecParallelSetupTupleQueues(pei->pcxt, true);
3168 989 : 172 : pei->reader = NULL;
3170 990 : 172 : pei->finished = false;
991 : :
3092 rhaas@postgresql.org 992 : 172 : fpes = shm_toc_lookup(pei->pcxt->toc, PARALLEL_KEY_EXECUTOR_FIXED, false);
993 : :
994 : : /* Free any serialized parameters from the last round. */
995 [ - + ]: 172 : if (DsaPointerIsValid(fpes->param_exec))
996 : : {
3060 rhaas@postgresql.org 997 :UBC 0 : dsa_free(pei->area, fpes->param_exec);
3092 998 : 0 : fpes->param_exec = InvalidDsaPointer;
999 : : }
1000 : :
1001 : : /* Serialize current parameter values if required. */
3092 rhaas@postgresql.org 1002 [ - + ]:CBC 172 : if (!bms_is_empty(sendParams))
1003 : : {
3060 rhaas@postgresql.org 1004 :UBC 0 : pei->param_exec = SerializeParamExecParams(estate, sendParams,
1005 : : pei->area);
3092 1006 : 0 : fpes->param_exec = pei->param_exec;
1007 : : }
1008 : :
1009 : : /* Traverse plan tree and let each child node reset associated state. */
3060 rhaas@postgresql.org 1010 :CBC 172 : estate->es_query_dsa = pei->area;
3170 tgl@sss.pgh.pa.us 1011 : 172 : ExecParallelReInitializeDSM(planstate, pei->pcxt);
3060 rhaas@postgresql.org 1012 : 172 : estate->es_query_dsa = NULL;
3170 tgl@sss.pgh.pa.us 1013 : 172 : }
1014 : :
1015 : : /*
1016 : : * Traverse plan tree to reinitialize per-node dynamic shared memory state
1017 : : */
1018 : : static bool
1019 : 444 : ExecParallelReInitializeDSM(PlanState *planstate,
1020 : : ParallelContext *pcxt)
1021 : : {
1022 [ - + ]: 444 : if (planstate == NULL)
3170 tgl@sss.pgh.pa.us 1023 :UBC 0 : return false;
1024 : :
1025 : : /*
1026 : : * Call reinitializers for DSM-using plan nodes.
1027 : : */
3170 tgl@sss.pgh.pa.us 1028 [ + + + - :CBC 444 : switch (nodeTag(planstate))
- - - + +
+ + ]
1029 : : {
1030 : 184 : case T_SeqScanState:
1031 [ + + ]: 184 : if (planstate->plan->parallel_aware)
1032 : 152 : ExecSeqScanReInitializeDSM((SeqScanState *) planstate,
1033 : : pcxt);
1034 : 184 : break;
1035 : 8 : case T_IndexScanState:
1036 [ + - ]: 8 : if (planstate->plan->parallel_aware)
1037 : 8 : ExecIndexScanReInitializeDSM((IndexScanState *) planstate,
1038 : : pcxt);
1039 : 8 : break;
1040 : 8 : case T_IndexOnlyScanState:
1041 [ + - ]: 8 : if (planstate->plan->parallel_aware)
1042 : 8 : ExecIndexOnlyScanReInitializeDSM((IndexOnlyScanState *) planstate,
1043 : : pcxt);
1044 : 8 : break;
3170 tgl@sss.pgh.pa.us 1045 :UBC 0 : case T_ForeignScanState:
1046 [ # # ]: 0 : if (planstate->plan->parallel_aware)
1047 : 0 : ExecForeignScanReInitializeDSM((ForeignScanState *) planstate,
1048 : : pcxt);
1049 : 0 : break;
159 drowley@postgresql.o 1050 :UNC 0 : case T_TidRangeScanState:
1051 [ # # ]: 0 : if (planstate->plan->parallel_aware)
1052 : 0 : ExecTidRangeScanReInitializeDSM((TidRangeScanState *) planstate,
1053 : : pcxt);
1054 : 0 : break;
3073 rhaas@postgresql.org 1055 :UBC 0 : case T_AppendState:
1056 [ # # ]: 0 : if (planstate->plan->parallel_aware)
1057 : 0 : ExecAppendReInitializeDSM((AppendState *) planstate, pcxt);
1058 : 0 : break;
3170 tgl@sss.pgh.pa.us 1059 : 0 : case T_CustomScanState:
1060 [ # # ]: 0 : if (planstate->plan->parallel_aware)
1061 : 0 : ExecCustomScanReInitializeDSM((CustomScanState *) planstate,
1062 : : pcxt);
1063 : 0 : break;
3170 tgl@sss.pgh.pa.us 1064 :CBC 36 : case T_BitmapHeapScanState:
1065 [ + - ]: 36 : if (planstate->plan->parallel_aware)
1066 : 36 : ExecBitmapHeapReInitializeDSM((BitmapHeapScanState *) planstate,
1067 : : pcxt);
1068 : 36 : break;
3058 andres@anarazel.de 1069 : 64 : case T_HashJoinState:
1070 [ + + ]: 64 : if (planstate->plan->parallel_aware)
1071 : 32 : ExecHashJoinReInitializeDSM((HashJoinState *) planstate,
1072 : : pcxt);
1073 : 64 : break;
420 pg@bowt.ie 1074 : 120 : case T_BitmapIndexScanState:
1075 : : case T_HashState:
1076 : : case T_SortState:
1077 : : case T_IncrementalSortState:
1078 : : case T_MemoizeState:
1079 : : /* these nodes have DSM state, but no reinitialization is required */
3170 tgl@sss.pgh.pa.us 1080 : 120 : break;
1081 : :
1082 : 24 : default:
1083 : 24 : break;
1084 : : }
1085 : :
1086 : 444 : return planstate_tree_walker(planstate, ExecParallelReInitializeDSM, pcxt);
1087 : : }
1088 : :
1089 : : /*
1090 : : * Copy instrumentation information about this node and its descendants from
1091 : : * dynamic shared memory.
1092 : : */
1093 : : static bool
3872 rhaas@postgresql.org 1094 : 684 : ExecParallelRetrieveInstrumentation(PlanState *planstate,
1095 : : SharedExecutorInstrumentation *instrumentation)
1096 : : {
1097 : : NodeInstrumentation *instrument;
1098 : : int i;
1099 : : int n;
1100 : : int ibytes;
3617 1101 : 684 : int plan_node_id = planstate->plan->plan_node_id;
1102 : : MemoryContext oldcontext;
1103 : :
1104 : : /* Find the instrumentation for this node. */
3800 1105 [ + - ]: 3092 : for (i = 0; i < instrumentation->num_plan_nodes; ++i)
1106 [ + + ]: 3092 : if (instrumentation->plan_node_id[i] == plan_node_id)
3872 1107 : 684 : break;
3800 1108 [ - + ]: 684 : if (i >= instrumentation->num_plan_nodes)
3872 rhaas@postgresql.org 1109 [ # # ]:UBC 0 : elog(ERROR, "plan node %d not found", plan_node_id);
1110 : :
1111 : : /* Accumulate the statistics from all workers. */
3800 rhaas@postgresql.org 1112 :CBC 684 : instrument = GetInstrumentationArray(instrumentation);
1113 : 684 : instrument += i * instrumentation->num_workers;
1114 [ + + ]: 1804 : for (n = 0; n < instrumentation->num_workers; ++n)
1115 : 1120 : InstrAggNode(planstate->instrument, &instrument[n]);
1116 : :
1117 : : /*
1118 : : * Also store the per-worker detail.
1119 : : *
1120 : : * Worker instrumentation should be allocated in the same context as the
1121 : : * regular instrumentation information, which is the per-query context.
1122 : : * Switch into per-query memory context.
1123 : : */
3065 1124 : 684 : oldcontext = MemoryContextSwitchTo(planstate->state->es_query_cxt);
30 andres@anarazel.de 1125 :GNC 684 : ibytes = mul_size(instrumentation->num_workers, sizeof(NodeInstrumentation));
3065 rhaas@postgresql.org 1126 :CBC 684 : planstate->worker_instrument =
30 andres@anarazel.de 1127 :GNC 684 : palloc(ibytes + offsetof(WorkerNodeInstrumentation, instrument));
3065 rhaas@postgresql.org 1128 :CBC 684 : MemoryContextSwitchTo(oldcontext);
1129 : :
3800 1130 : 684 : planstate->worker_instrument->num_workers = instrumentation->num_workers;
3065 1131 : 684 : memcpy(&planstate->worker_instrument->instrument, instrument, ibytes);
1132 : :
1133 : : /* Perform any node-type-specific work that needs to be done. */
3073 andres@anarazel.de 1134 [ + - - + : 684 : switch (nodeTag(planstate))
- + + - -
+ - + ]
1135 : : {
420 pg@bowt.ie 1136 : 180 : case T_IndexScanState:
1137 : 180 : ExecIndexScanRetrieveInstrumentation((IndexScanState *) planstate);
1138 : 180 : break;
420 pg@bowt.ie 1139 :UBC 0 : case T_IndexOnlyScanState:
1140 : 0 : ExecIndexOnlyScanRetrieveInstrumentation((IndexOnlyScanState *) planstate);
1141 : 0 : break;
1142 : 0 : case T_BitmapIndexScanState:
1143 : 0 : ExecBitmapIndexScanRetrieveInstrumentation((BitmapIndexScanState *) planstate);
1144 : 0 : break;
3073 andres@anarazel.de 1145 :CBC 8 : case T_SortState:
1146 : 8 : ExecSortRetrieveInstrumentation((SortState *) planstate);
1147 : 8 : break;
2220 tomas.vondra@postgre 1148 :UBC 0 : case T_IncrementalSortState:
1149 : 0 : ExecIncrementalSortRetrieveInstrumentation((IncrementalSortState *) planstate);
1150 : 0 : break;
3073 andres@anarazel.de 1151 :CBC 56 : case T_HashState:
1152 : 56 : ExecHashRetrieveInstrumentation((HashState *) planstate);
1153 : 56 : break;
2146 drowley@postgresql.o 1154 : 68 : case T_AggState:
1155 : 68 : ExecAggRetrieveInstrumentation((AggState *) planstate);
1156 : 68 : break;
1756 drowley@postgresql.o 1157 :UBC 0 : case T_MemoizeState:
1158 : 0 : ExecMemoizeRetrieveInstrumentation((MemoizeState *) planstate);
1859 1159 : 0 : break;
665 1160 : 0 : case T_BitmapHeapScanState:
1161 : 0 : ExecBitmapHeapRetrieveInstrumentation((BitmapHeapScanState *) planstate);
1162 : 0 : break;
28 tomas.vondra@postgre 1163 :GNC 232 : case T_SeqScanState:
1164 : 232 : ExecSeqScanRetrieveInstrumentation((SeqScanState *) planstate);
1165 : 232 : break;
28 tomas.vondra@postgre 1166 :UNC 0 : case T_TidRangeScanState:
1167 : 0 : ExecTidRangeScanRetrieveInstrumentation((TidRangeScanState *) planstate);
1168 : 0 : break;
3073 andres@anarazel.de 1169 :CBC 140 : default:
1170 : 140 : break;
1171 : : }
1172 : :
3872 rhaas@postgresql.org 1173 : 684 : return planstate_tree_walker(planstate, ExecParallelRetrieveInstrumentation,
1174 : : instrumentation);
1175 : : }
1176 : :
1177 : : /*
1178 : : * Add up the workers' JIT instrumentation from dynamic shared memory.
1179 : : */
1180 : : static void
2779 andres@anarazel.de 1181 :LBC (12) : ExecParallelRetrieveJitInstrumentation(PlanState *planstate,
1182 : : SharedJitInstrumentation *shared_jit)
1183 : : {
1184 : : JitInstrumentation *combined;
1185 : : int ibytes;
1186 : :
1187 : : int n;
1188 : :
1189 : : /*
1190 : : * Accumulate worker JIT instrumentation into the combined JIT
1191 : : * instrumentation, allocating it if required.
1192 : : */
2771 1193 [ # # ]: (12) : if (!planstate->state->es_jit_worker_instr)
1194 : (12) : planstate->state->es_jit_worker_instr =
2779 1195 : (12) : MemoryContextAllocZero(planstate->state->es_query_cxt, sizeof(JitInstrumentation));
2771 1196 : (12) : combined = planstate->state->es_jit_worker_instr;
1197 : :
1198 : : /* Accumulate all the workers' instrumentations. */
2779 1199 [ # # ]: (36) : for (n = 0; n < shared_jit->num_workers; ++n)
1200 : (24) : InstrJitAgg(combined, &shared_jit->jit_instr[n]);
1201 : :
1202 : : /*
1203 : : * Store the per-worker detail.
1204 : : *
1205 : : * Similar to ExecParallelRetrieveInstrumentation(), allocate the
1206 : : * instrumentation in per-query context.
1207 : : */
1208 : (12) : ibytes = offsetof(SharedJitInstrumentation, jit_instr)
2540 tgl@sss.pgh.pa.us 1209 : (12) : + mul_size(shared_jit->num_workers, sizeof(JitInstrumentation));
2779 andres@anarazel.de 1210 : (12) : planstate->worker_jit_instrument =
1211 : (12) : MemoryContextAlloc(planstate->state->es_query_cxt, ibytes);
1212 : :
1213 : (12) : memcpy(planstate->worker_jit_instrument, shared_jit, ibytes);
1214 : (12) : }
1215 : :
1216 : : /*
1217 : : * Finish parallel execution. We wait for parallel workers to finish, and
1218 : : * accumulate their buffer/WAL usage.
1219 : : */
1220 : : void
3872 rhaas@postgresql.org 1221 :CBC 1248 : ExecParallelFinish(ParallelExecutorInfo *pei)
1222 : : {
3168 tgl@sss.pgh.pa.us 1223 : 1248 : int nworkers = pei->pcxt->nworkers_launched;
1224 : : int i;
1225 : :
1226 : : /* Make this be a no-op if called twice in a row. */
3821 rhaas@postgresql.org 1227 [ + + ]: 1248 : if (pei->finished)
1228 : 560 : return;
1229 : :
1230 : : /*
1231 : : * Detach from tuple queues ASAP, so that any still-active workers will
1232 : : * notice that no further results are wanted.
1233 : : */
3168 tgl@sss.pgh.pa.us 1234 [ + - ]: 688 : if (pei->tqueue != NULL)
1235 : : {
1236 [ + + ]: 2496 : for (i = 0; i < nworkers; i++)
1237 : 1808 : shm_mq_detach(pei->tqueue[i]);
1238 : 688 : pfree(pei->tqueue);
1239 : 688 : pei->tqueue = NULL;
1240 : : }
1241 : :
1242 : : /*
1243 : : * While we're waiting for the workers to finish, let's get rid of the
1244 : : * tuple queue readers. (Any other local cleanup could be done here too.)
1245 : : */
1246 [ + + ]: 688 : if (pei->reader != NULL)
1247 : : {
1248 [ + + ]: 2483 : for (i = 0; i < nworkers; i++)
1249 : 1808 : DestroyTupleQueueReader(pei->reader[i]);
1250 : 675 : pfree(pei->reader);
1251 : 675 : pei->reader = NULL;
1252 : : }
1253 : :
1254 : : /* Now wait for the workers to finish. */
3872 rhaas@postgresql.org 1255 : 688 : WaitForParallelWorkersToFinish(pei->pcxt);
1256 : :
1257 : : /*
1258 : : * Next, accumulate buffer/WAL usage. (This must wait for the workers to
1259 : : * finish, or we might get incomplete data.)
1260 : : */
3168 tgl@sss.pgh.pa.us 1261 [ + + ]: 2496 : for (i = 0; i < nworkers; i++)
2222 akapila@postgresql.o 1262 : 1808 : InstrAccumParallelQuery(&pei->buffer_usage[i], &pei->wal_usage[i]);
1263 : :
3821 rhaas@postgresql.org 1264 : 688 : pei->finished = true;
1265 : : }
1266 : :
1267 : : /*
1268 : : * Accumulate instrumentation, and then clean up whatever ParallelExecutorInfo
1269 : : * resources still exist after ExecParallelFinish. We separate these
1270 : : * routines because someone might want to examine the contents of the DSM
1271 : : * after ExecParallelFinish and before calling this routine.
1272 : : */
1273 : : void
3854 1274 : 516 : ExecParallelCleanup(ParallelExecutorInfo *pei)
1275 : : {
1276 : : /* Accumulate instrumentation, if any. */
3059 1277 [ + + ]: 516 : if (pei->instrumentation)
1278 : 120 : ExecParallelRetrieveInstrumentation(pei->planstate,
1279 : : pei->instrumentation);
1280 : :
1281 : : /* Accumulate JIT instrumentation, if any. */
2779 andres@anarazel.de 1282 [ - + ]: 516 : if (pei->jit_instrumentation)
2779 andres@anarazel.de 1283 :LBC (12) : ExecParallelRetrieveJitInstrumentation(pei->planstate,
2540 tgl@sss.pgh.pa.us 1284 : (12) : pei->jit_instrumentation);
1285 : :
1286 : : /* Free any serialized parameters. */
3092 rhaas@postgresql.org 1287 [ + + ]:CBC 516 : if (DsaPointerIsValid(pei->param_exec))
1288 : : {
1289 : 16 : dsa_free(pei->area, pei->param_exec);
1290 : 16 : pei->param_exec = InvalidDsaPointer;
1291 : : }
3424 1292 [ + - ]: 516 : if (pei->area != NULL)
1293 : : {
1294 : 516 : dsa_detach(pei->area);
1295 : 516 : pei->area = NULL;
1296 : : }
3854 1297 [ + - ]: 516 : if (pei->pcxt != NULL)
1298 : : {
1299 : 516 : DestroyParallelContext(pei->pcxt);
1300 : 516 : pei->pcxt = NULL;
1301 : : }
1302 : 516 : pfree(pei);
1303 : 516 : }
1304 : :
1305 : : /*
1306 : : * Create a DestReceiver to write tuples we produce to the shm_mq designated
1307 : : * for that purpose.
1308 : : */
1309 : : static DestReceiver *
3872 1310 : 1816 : ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc)
1311 : : {
1312 : : char *mqspace;
1313 : : shm_mq *mq;
1314 : :
3256 tgl@sss.pgh.pa.us 1315 : 1816 : mqspace = shm_toc_lookup(toc, PARALLEL_KEY_TUPLE_QUEUE, false);
3872 rhaas@postgresql.org 1316 : 1816 : mqspace += ParallelWorkerNumber * PARALLEL_TUPLE_QUEUE_SIZE;
1317 : 1816 : mq = (shm_mq *) mqspace;
1318 : 1816 : shm_mq_set_sender(mq, MyProc);
1319 : 1816 : return CreateTupleQueueDestReceiver(shm_mq_attach(mq, seg, NULL));
1320 : : }
1321 : :
1322 : : /*
1323 : : * Create a QueryDesc for the PlannedStmt we are to execute, and return it.
1324 : : */
1325 : : static QueryDesc *
1326 : 1816 : ExecParallelGetQueryDesc(shm_toc *toc, DestReceiver *receiver,
1327 : : int instrument_options)
1328 : : {
1329 : : char *pstmtspace;
1330 : : char *paramspace;
1331 : : PlannedStmt *pstmt;
1332 : : ParamListInfo paramLI;
1333 : : char *queryString;
1334 : :
1335 : : /* Get the query string from shared memory */
3256 tgl@sss.pgh.pa.us 1336 : 1816 : queryString = shm_toc_lookup(toc, PARALLEL_KEY_QUERY_TEXT, false);
1337 : :
1338 : : /* Reconstruct leader-supplied PlannedStmt. */
1339 : 1816 : pstmtspace = shm_toc_lookup(toc, PARALLEL_KEY_PLANNEDSTMT, false);
3872 rhaas@postgresql.org 1340 : 1816 : pstmt = (PlannedStmt *) stringToNode(pstmtspace);
1341 : :
1342 : : /* Reconstruct ParamListInfo. */
3092 1343 : 1816 : paramspace = shm_toc_lookup(toc, PARALLEL_KEY_PARAMLISTINFO, false);
3872 1344 : 1816 : paramLI = RestoreParamList(¶mspace);
1345 : :
1346 : : /* Create a QueryDesc for the query. */
1347 : 1816 : return CreateQueryDesc(pstmt,
1348 : : queryString,
1349 : : GetActiveSnapshot(), InvalidSnapshot,
1350 : : receiver, paramLI, NULL, instrument_options);
1351 : : }
1352 : :
1353 : : /*
1354 : : * Copy instrumentation information from this node and its descendants into
1355 : : * dynamic shared memory, so that the parallel leader can retrieve it.
1356 : : */
1357 : : static bool
1358 : 1584 : ExecParallelReportInstrumentation(PlanState *planstate,
1359 : : SharedExecutorInstrumentation *instrumentation)
1360 : : {
1361 : : int i;
3617 1362 : 1584 : int plan_node_id = planstate->plan->plan_node_id;
1363 : : NodeInstrumentation *instrument;
1364 : :
3800 1365 : 1584 : InstrEndLoop(planstate->instrument);
1366 : :
1367 : : /*
1368 : : * If we shuffled the plan_node_id values in ps_instrument into sorted
1369 : : * order, we could use binary search here. This might matter someday if
1370 : : * we're pushing down sufficiently large plan trees. For now, do it the
1371 : : * slow, dumb way.
1372 : : */
1373 [ + - ]: 5208 : for (i = 0; i < instrumentation->num_plan_nodes; ++i)
1374 [ + + ]: 5208 : if (instrumentation->plan_node_id[i] == plan_node_id)
3872 1375 : 1584 : break;
3800 1376 [ - + ]: 1584 : if (i >= instrumentation->num_plan_nodes)
3872 rhaas@postgresql.org 1377 [ # # ]:UBC 0 : elog(ERROR, "plan node %d not found", plan_node_id);
1378 : :
1379 : : /*
1380 : : * Add our statistics to the per-node, per-worker totals. It's possible
1381 : : * that this could happen more than once if we relaunched workers.
1382 : : */
3800 rhaas@postgresql.org 1383 :CBC 1584 : instrument = GetInstrumentationArray(instrumentation);
1384 : 1584 : instrument += i * instrumentation->num_workers;
1385 [ - + ]: 1584 : Assert(IsParallelWorker());
1386 [ - + ]: 1584 : Assert(ParallelWorkerNumber < instrumentation->num_workers);
1387 : 1584 : InstrAggNode(&instrument[ParallelWorkerNumber], planstate->instrument);
1388 : :
3872 1389 : 1584 : return planstate_tree_walker(planstate, ExecParallelReportInstrumentation,
1390 : : instrumentation);
1391 : : }
1392 : :
1393 : : /*
1394 : : * Initialize the PlanState and its descendants with the information
1395 : : * retrieved from shared memory. This has to be done once the PlanState
1396 : : * is allocated and initialized by executor; that is, after ExecutorStart().
1397 : : */
1398 : : static bool
3092 andres@anarazel.de 1399 : 8149 : ExecParallelInitializeWorker(PlanState *planstate, ParallelWorkerContext *pwcxt)
1400 : : {
3828 rhaas@postgresql.org 1401 [ - + ]: 8149 : if (planstate == NULL)
3828 rhaas@postgresql.org 1402 :UBC 0 : return false;
1403 : :
3171 rhaas@postgresql.org 1404 [ + + + + :CBC 8149 : switch (nodeTag(planstate))
- + + - +
+ + + - +
+ + ]
1405 : : {
1406 : 3749 : case T_SeqScanState:
1407 [ + + ]: 3749 : if (planstate->plan->parallel_aware)
3092 andres@anarazel.de 1408 : 2971 : ExecSeqScanInitializeWorker((SeqScanState *) planstate, pwcxt);
1409 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
28 tomas.vondra@postgre 1410 :GNC 3749 : ExecSeqScanInstrumentInitWorker((SeqScanState *) planstate, pwcxt);
3171 rhaas@postgresql.org 1411 :CBC 3749 : break;
1412 : 424 : case T_IndexScanState:
29 melanieplageman@gmai 1413 [ + + ]:GNC 424 : if (planstate->plan->parallel_aware)
1414 : 80 : ExecIndexScanInitializeWorker((IndexScanState *) planstate,
1415 : : pwcxt);
1416 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
1417 : 424 : ExecIndexScanInstrumentInitWorker((IndexScanState *) planstate,
1418 : : pwcxt);
3171 rhaas@postgresql.org 1419 :CBC 424 : break;
1420 : 168 : case T_IndexOnlyScanState:
29 melanieplageman@gmai 1421 [ + + ]:GNC 168 : if (planstate->plan->parallel_aware)
1422 : 136 : ExecIndexOnlyScanInitializeWorker((IndexOnlyScanState *) planstate,
1423 : : pwcxt);
1424 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
1425 : 168 : ExecIndexOnlyScanInstrumentInitWorker((IndexOnlyScanState *) planstate,
1426 : : pwcxt);
420 pg@bowt.ie 1427 :CBC 168 : break;
1428 : 181 : case T_BitmapIndexScanState:
1429 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
1430 : 181 : ExecBitmapIndexScanInitializeWorker((BitmapIndexScanState *) planstate,
1431 : : pwcxt);
3171 rhaas@postgresql.org 1432 : 181 : break;
3171 rhaas@postgresql.org 1433 :UBC 0 : case T_ForeignScanState:
1434 [ # # ]: 0 : if (planstate->plan->parallel_aware)
3744 1435 : 0 : ExecForeignScanInitializeWorker((ForeignScanState *) planstate,
1436 : : pwcxt);
3171 1437 : 0 : break;
159 drowley@postgresql.o 1438 :GNC 64 : case T_TidRangeScanState:
1439 [ + - ]: 64 : if (planstate->plan->parallel_aware)
1440 : 64 : ExecTidRangeScanInitializeWorker((TidRangeScanState *) planstate,
1441 : : pwcxt);
1442 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
28 tomas.vondra@postgre 1443 : 64 : ExecTidRangeScanInstrumentInitWorker((TidRangeScanState *) planstate,
1444 : : pwcxt);
159 drowley@postgresql.o 1445 : 64 : break;
3073 rhaas@postgresql.org 1446 :CBC 298 : case T_AppendState:
1447 [ + + ]: 298 : if (planstate->plan->parallel_aware)
1448 : 242 : ExecAppendInitializeWorker((AppendState *) planstate, pwcxt);
1449 : 298 : break;
3171 rhaas@postgresql.org 1450 :UBC 0 : case T_CustomScanState:
1451 [ # # ]: 0 : if (planstate->plan->parallel_aware)
3744 1452 : 0 : ExecCustomScanInitializeWorker((CustomScanState *) planstate,
1453 : : pwcxt);
3171 1454 : 0 : break;
3171 rhaas@postgresql.org 1455 :CBC 181 : case T_BitmapHeapScanState:
1456 [ + + ]: 181 : if (planstate->plan->parallel_aware)
3092 andres@anarazel.de 1457 : 180 : ExecBitmapHeapInitializeWorker((BitmapHeapScanState *) planstate,
1458 : : pwcxt);
1459 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
28 tomas.vondra@postgre 1460 :GNC 181 : ExecBitmapHeapInstrumentInitWorker((BitmapHeapScanState *) planstate,
1461 : : pwcxt);
3171 rhaas@postgresql.org 1462 :CBC 181 : break;
3058 andres@anarazel.de 1463 : 524 : case T_HashJoinState:
1464 [ + + ]: 524 : if (planstate->plan->parallel_aware)
1465 : 212 : ExecHashJoinInitializeWorker((HashJoinState *) planstate,
1466 : : pwcxt);
1467 : 524 : break;
3073 1468 : 524 : case T_HashState:
1469 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
1470 : 524 : ExecHashInitializeWorker((HashState *) planstate, pwcxt);
1471 : 524 : break;
3171 rhaas@postgresql.org 1472 : 491 : case T_SortState:
1473 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
3092 andres@anarazel.de 1474 : 491 : ExecSortInitializeWorker((SortState *) planstate, pwcxt);
3170 tgl@sss.pgh.pa.us 1475 : 491 : break;
2220 tomas.vondra@postgre 1476 :UBC 0 : case T_IncrementalSortState:
1477 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
1478 : 0 : ExecIncrementalSortInitializeWorker((IncrementalSortState *) planstate,
1479 : : pwcxt);
1480 : 0 : break;
2146 drowley@postgresql.o 1481 :CBC 1107 : case T_AggState:
1482 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
1483 : 1107 : ExecAggInitializeWorker((AggState *) planstate, pwcxt);
1484 : 1107 : break;
1756 1485 : 8 : case T_MemoizeState:
1486 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
1487 : 8 : ExecMemoizeInitializeWorker((MemoizeState *) planstate, pwcxt);
1859 1488 : 8 : break;
3171 rhaas@postgresql.org 1489 : 430 : default:
1490 : 430 : break;
1491 : : }
1492 : :
3092 andres@anarazel.de 1493 : 8149 : return planstate_tree_walker(planstate, ExecParallelInitializeWorker,
1494 : : pwcxt);
1495 : : }
1496 : :
1497 : : /*
1498 : : * Main entrypoint for parallel query worker processes.
1499 : : *
1500 : : * We reach this function from ParallelWorkerMain, so the setup necessary to
1501 : : * create a sensible parallel environment has already been done;
1502 : : * ParallelWorkerMain worries about stuff like the transaction state, combo
1503 : : * CID mappings, and GUC values, so we don't need to deal with any of that
1504 : : * here.
1505 : : *
1506 : : * Our job is to deal with concerns specific to the executor. The parallel
1507 : : * group leader will have stored a serialized PlannedStmt, and it's our job
1508 : : * to execute that plan and write the resulting tuples to the appropriate
1509 : : * tuple queue. Various bits of supporting information that we need in order
1510 : : * to do this are also stored in the dsm_segment and can be accessed through
1511 : : * the shm_toc.
1512 : : */
1513 : : void
3872 rhaas@postgresql.org 1514 : 1816 : ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
1515 : : {
1516 : : FixedParallelExecutorState *fpes;
1517 : : BufferUsage *buffer_usage;
1518 : : WalUsage *wal_usage;
1519 : : DestReceiver *receiver;
1520 : : QueryDesc *queryDesc;
1521 : : SharedExecutorInstrumentation *instrumentation;
1522 : : SharedJitInstrumentation *jit_instrumentation;
1523 : 1816 : int instrument_options = 0;
1524 : : void *area_space;
1525 : : dsa_area *area;
1526 : : ParallelWorkerContext pwcxt;
1527 : :
1528 : : /* Get fixed-size state. */
3171 1529 : 1816 : fpes = shm_toc_lookup(toc, PARALLEL_KEY_EXECUTOR_FIXED, false);
1530 : :
1531 : : /* Set up DestReceiver, SharedExecutorInstrumentation, and QueryDesc. */
3872 1532 : 1816 : receiver = ExecParallelGetReceiver(seg, toc);
3256 tgl@sss.pgh.pa.us 1533 : 1816 : instrumentation = shm_toc_lookup(toc, PARALLEL_KEY_INSTRUMENTATION, true);
3872 rhaas@postgresql.org 1534 [ + + ]: 1816 : if (instrumentation != NULL)
1535 : 484 : instrument_options = instrumentation->instrument_options;
2779 andres@anarazel.de 1536 : 1816 : jit_instrumentation = shm_toc_lookup(toc, PARALLEL_KEY_JIT_INSTRUMENTATION,
1537 : : true);
3872 rhaas@postgresql.org 1538 : 1816 : queryDesc = ExecParallelGetQueryDesc(toc, receiver, instrument_options);
1539 : :
1540 : : /* Setting debug_query_string for individual workers */
3359 1541 : 1816 : debug_query_string = queryDesc->sourceText;
1542 : :
1543 : : /* Report workers' query for monitoring purposes */
1544 : 1816 : pgstat_report_activity(STATE_RUNNING, debug_query_string);
1545 : :
1546 : : /* Attach to the dynamic shared memory area. */
3256 tgl@sss.pgh.pa.us 1547 : 1816 : area_space = shm_toc_lookup(toc, PARALLEL_KEY_DSA, false);
3424 rhaas@postgresql.org 1548 : 1816 : area = dsa_attach_in_place(area_space, seg);
1549 : :
1550 : : /* Start up the executor */
2966 andres@anarazel.de 1551 : 1816 : queryDesc->plannedstmt->jitFlags = fpes->jit_flags;
348 amitlan@postgresql.o 1552 : 1816 : ExecutorStart(queryDesc, fpes->eflags);
1553 : :
1554 : : /* Special executor initialization steps for parallel workers */
3424 rhaas@postgresql.org 1555 : 1816 : queryDesc->planstate->state->es_query_dsa = area;
3092 1556 [ + + ]: 1816 : if (DsaPointerIsValid(fpes->param_exec))
1557 : : {
1558 : : char *paramexec_space;
1559 : :
1560 : 48 : paramexec_space = dsa_get_address(area, fpes->param_exec);
1561 : 48 : RestoreParamExecParams(paramexec_space, queryDesc->estate);
1562 : : }
andres@anarazel.de 1563 : 1816 : pwcxt.toc = toc;
1564 : 1816 : pwcxt.seg = seg;
1565 : 1816 : ExecParallelInitializeWorker(queryDesc->planstate, &pwcxt);
1566 : :
1567 : : /* Pass down any tuple bound */
3171 rhaas@postgresql.org 1568 : 1816 : ExecSetTupleBound(fpes->tuples_needed, queryDesc->planstate);
1569 : :
1570 : : /*
1571 : : * Prepare to track buffer/WAL usage during query execution.
1572 : : *
1573 : : * We do this after starting up the executor to match what happens in the
1574 : : * leader, which also doesn't count buffer accesses and WAL activity that
1575 : : * occur during executor startup.
1576 : : */
2832 akapila@postgresql.o 1577 : 1816 : InstrStartParallelQuery();
1578 : :
1579 : : /*
1580 : : * Run the plan. If we specified a tuple bound, be careful not to demand
1581 : : * more tuples than that.
1582 : : */
3171 rhaas@postgresql.org 1583 : 1816 : ExecutorRun(queryDesc,
1584 : : ForwardScanDirection,
512 tgl@sss.pgh.pa.us 1585 : 1816 : fpes->tuples_needed < 0 ? (int64) 0 : fpes->tuples_needed);
1586 : :
1587 : : /* Shut down the executor */
3872 rhaas@postgresql.org 1588 : 1808 : ExecutorFinish(queryDesc);
1589 : :
1590 : : /* Report buffer/WAL usage during parallel execution. */
3256 tgl@sss.pgh.pa.us 1591 : 1808 : buffer_usage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE, false);
2222 akapila@postgresql.o 1592 : 1808 : wal_usage = shm_toc_lookup(toc, PARALLEL_KEY_WAL_USAGE, false);
1593 : 1808 : InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber],
1594 : 1808 : &wal_usage[ParallelWorkerNumber]);
1595 : :
1596 : : /* Report instrumentation data if any instrumentation options are set. */
3872 rhaas@postgresql.org 1597 [ + + ]: 1808 : if (instrumentation != NULL)
1598 : 484 : ExecParallelReportInstrumentation(queryDesc->planstate,
1599 : : instrumentation);
1600 : :
1601 : : /* Report JIT instrumentation data if any */
2779 andres@anarazel.de 1602 [ - + - - ]: 1808 : if (queryDesc->estate->es_jit && jit_instrumentation != NULL)
1603 : : {
2779 andres@anarazel.de 1604 [ # # ]:UBC 0 : Assert(ParallelWorkerNumber < jit_instrumentation->num_workers);
1605 : 0 : jit_instrumentation->jit_instr[ParallelWorkerNumber] =
1606 : 0 : queryDesc->estate->es_jit->instr;
1607 : : }
1608 : :
1609 : : /* Must do this after capturing instrumentation. */
3870 rhaas@postgresql.org 1610 :CBC 1808 : ExecutorEnd(queryDesc);
1611 : :
1612 : : /* Cleanup. */
3424 1613 : 1808 : dsa_detach(area);
3872 1614 : 1808 : FreeQueryDesc(queryDesc);
3162 peter_e@gmx.net 1615 : 1808 : receiver->rDestroy(receiver);
3872 rhaas@postgresql.org 1616 : 1808 : }
|