Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * execParallel.c
4 : : * Support routines for parallel execution.
5 : : *
6 : : * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
7 : : * Portions Copyright (c) 1994, Regents of the University of California
8 : : *
9 : : * This file contains routines that are intended to support setting up,
10 : : * using, and tearing down a ParallelContext from within the PostgreSQL
11 : : * executor. The ParallelContext machinery will handle starting the
12 : : * workers and ensuring that their state generally matches that of the
13 : : * leader; see src/backend/access/transam/README.parallel for details.
14 : : * However, we must save and restore relevant executor state, such as
15 : : * any ParamListInfo associated with the query, buffer/WAL usage info, and
16 : : * the actual plan to be passed down to the worker.
17 : : *
18 : : * IDENTIFICATION
19 : : * src/backend/executor/execParallel.c
20 : : *
21 : : *-------------------------------------------------------------------------
22 : : */
23 : :
24 : : #include "postgres.h"
25 : :
26 : : #include "executor/execParallel.h"
27 : : #include "executor/executor.h"
28 : : #include "executor/nodeAgg.h"
29 : : #include "executor/nodeAppend.h"
30 : : #include "executor/nodeBitmapHeapscan.h"
31 : : #include "executor/nodeBitmapIndexscan.h"
32 : : #include "executor/nodeCustom.h"
33 : : #include "executor/nodeForeignscan.h"
34 : : #include "executor/nodeHash.h"
35 : : #include "executor/nodeHashjoin.h"
36 : : #include "executor/nodeIncrementalSort.h"
37 : : #include "executor/nodeIndexonlyscan.h"
38 : : #include "executor/nodeIndexscan.h"
39 : : #include "executor/nodeMemoize.h"
40 : : #include "executor/nodeSeqscan.h"
41 : : #include "executor/nodeSort.h"
42 : : #include "executor/nodeSubplan.h"
43 : : #include "executor/tqueue.h"
44 : : #include "jit/jit.h"
45 : : #include "nodes/nodeFuncs.h"
46 : : #include "pgstat.h"
47 : : #include "tcop/tcopprot.h"
48 : : #include "utils/datum.h"
49 : : #include "utils/dsa.h"
50 : : #include "utils/lsyscache.h"
51 : : #include "utils/snapmgr.h"
52 : :
53 : : /*
54 : : * Magic numbers for parallel executor communication. We use constants
55 : : * greater than any 32-bit integer here so that values < 2^32 can be used
56 : : * by individual parallel nodes to store their own state.
57 : : */
58 : : #define PARALLEL_KEY_EXECUTOR_FIXED UINT64CONST(0xE000000000000001)
59 : : #define PARALLEL_KEY_PLANNEDSTMT UINT64CONST(0xE000000000000002)
60 : : #define PARALLEL_KEY_PARAMLISTINFO UINT64CONST(0xE000000000000003)
61 : : #define PARALLEL_KEY_BUFFER_USAGE UINT64CONST(0xE000000000000004)
62 : : #define PARALLEL_KEY_TUPLE_QUEUE UINT64CONST(0xE000000000000005)
63 : : #define PARALLEL_KEY_INSTRUMENTATION UINT64CONST(0xE000000000000006)
64 : : #define PARALLEL_KEY_DSA UINT64CONST(0xE000000000000007)
65 : : #define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xE000000000000008)
66 : : #define PARALLEL_KEY_JIT_INSTRUMENTATION UINT64CONST(0xE000000000000009)
67 : : #define PARALLEL_KEY_WAL_USAGE UINT64CONST(0xE00000000000000A)
68 : :
69 : : #define PARALLEL_TUPLE_QUEUE_SIZE 65536
70 : :
71 : : /*
72 : : * Fixed-size random stuff that we need to pass to parallel workers.
73 : : */
74 : : typedef struct FixedParallelExecutorState
75 : : {
76 : : int64 tuples_needed; /* tuple bound, see ExecSetTupleBound */
77 : : dsa_pointer param_exec;
78 : : int eflags;
79 : : int jit_flags;
80 : : } FixedParallelExecutorState;
81 : :
82 : : /*
83 : : * DSM structure for accumulating per-PlanState instrumentation.
84 : : *
85 : : * instrument_options: Same meaning here as in instrument.c.
86 : : *
87 : : * instrument_offset: Offset, relative to the start of this structure,
88 : : * of the first Instrumentation object. This will depend on the length of
89 : : * the plan_node_id array.
90 : : *
91 : : * num_workers: Number of workers.
92 : : *
93 : : * num_plan_nodes: Number of plan nodes.
94 : : *
95 : : * plan_node_id: Array of plan nodes for which we are gathering instrumentation
96 : : * from parallel workers. The length of this array is given by num_plan_nodes.
97 : : */
98 : : struct SharedExecutorInstrumentation
99 : : {
100 : : int instrument_options;
101 : : int instrument_offset;
102 : : int num_workers;
103 : : int num_plan_nodes;
104 : : int plan_node_id[FLEXIBLE_ARRAY_MEMBER];
105 : : /* array of num_plan_nodes * num_workers Instrumentation objects follows */
106 : : };
107 : : #define GetInstrumentationArray(sei) \
108 : : (AssertVariableIsOfTypeMacro(sei, SharedExecutorInstrumentation *), \
109 : : (Instrumentation *) (((char *) sei) + sei->instrument_offset))
110 : :
111 : : /* Context object for ExecParallelEstimate. */
112 : : typedef struct ExecParallelEstimateContext
113 : : {
114 : : ParallelContext *pcxt;
115 : : int nnodes;
116 : : } ExecParallelEstimateContext;
117 : :
118 : : /* Context object for ExecParallelInitializeDSM. */
119 : : typedef struct ExecParallelInitializeDSMContext
120 : : {
121 : : ParallelContext *pcxt;
122 : : SharedExecutorInstrumentation *instrumentation;
123 : : int nnodes;
124 : : } ExecParallelInitializeDSMContext;
125 : :
126 : : /* Helper functions that run in the parallel leader. */
127 : : static char *ExecSerializePlan(Plan *plan, EState *estate);
128 : : static bool ExecParallelEstimate(PlanState *planstate,
129 : : ExecParallelEstimateContext *e);
130 : : static bool ExecParallelInitializeDSM(PlanState *planstate,
131 : : ExecParallelInitializeDSMContext *d);
132 : : static shm_mq_handle **ExecParallelSetupTupleQueues(ParallelContext *pcxt,
133 : : bool reinitialize);
134 : : static bool ExecParallelReInitializeDSM(PlanState *planstate,
135 : : ParallelContext *pcxt);
136 : : static bool ExecParallelRetrieveInstrumentation(PlanState *planstate,
137 : : SharedExecutorInstrumentation *instrumentation);
138 : :
139 : : /* Helper function that runs in the parallel worker. */
140 : : static DestReceiver *ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc);
141 : :
142 : : /*
143 : : * Create a serialized representation of the plan to be sent to each worker.
144 : : */
145 : : static char *
3629 rhaas@postgresql.org 146 :CBC 360 : ExecSerializePlan(Plan *plan, EState *estate)
147 : : {
148 : : PlannedStmt *pstmt;
149 : : ListCell *lc;
150 : :
151 : : /* We can't scribble on the original plan, so make a copy. */
3631 152 : 360 : plan = copyObject(plan);
153 : :
154 : : /*
155 : : * The worker will start its own copy of the executor, and that copy will
156 : : * insert a junk filter if the toplevel node has any resjunk entries. We
157 : : * don't want that to happen, because while resjunk columns shouldn't be
158 : : * sent back to the user, here the tuples are coming back to another
159 : : * backend which may very well need them. So mutate the target list
160 : : * accordingly. This is sort of a hack; there might be better ways to do
161 : : * this...
162 : : */
3069 tgl@sss.pgh.pa.us 163 [ + + + + : 991 : foreach(lc, plan->targetlist)
+ + ]
164 : : {
165 : 631 : TargetEntry *tle = lfirst_node(TargetEntry, lc);
166 : :
3631 rhaas@postgresql.org 167 : 631 : tle->resjunk = false;
168 : : }
169 : :
170 : : /*
171 : : * Create a dummy PlannedStmt. Most of the fields don't need to be valid
172 : : * for our purposes, but the worker will need at least a minimal
173 : : * PlannedStmt to start the executor.
174 : : */
175 : 360 : pstmt = makeNode(PlannedStmt);
176 : 360 : pstmt->commandType = CMD_SELECT;
1600 bruce@momjian.us 177 : 360 : pstmt->queryId = pgstat_get_my_query_id();
166 michael@paquier.xyz 178 : 360 : pstmt->planId = pgstat_get_my_plan_id();
3340 tgl@sss.pgh.pa.us 179 : 360 : pstmt->hasReturning = false;
180 : 360 : pstmt->hasModifyingCTE = false;
181 : 360 : pstmt->canSetTag = true;
182 : 360 : pstmt->transientPlan = false;
183 : 360 : pstmt->dependsOnRole = false;
184 : 360 : pstmt->parallelModeNeeded = false;
3631 rhaas@postgresql.org 185 : 360 : pstmt->planTree = plan;
219 amitlan@postgresql.o 186 : 360 : pstmt->partPruneInfos = estate->es_part_prune_infos;
3629 rhaas@postgresql.org 187 : 360 : pstmt->rtable = estate->es_range_table;
211 amitlan@postgresql.o 188 : 360 : pstmt->unprunableRelids = estate->es_unpruned_relids;
1005 alvherre@alvh.no-ip. 189 : 360 : pstmt->permInfos = estate->es_rteperminfos;
3631 rhaas@postgresql.org 190 : 360 : pstmt->resultRelations = NIL;
2096 tgl@sss.pgh.pa.us 191 : 360 : pstmt->appendRelations = NIL;
37 michael@paquier.xyz 192 :GNC 360 : pstmt->planOrigin = PLAN_STMT_INTERNAL;
193 : :
194 : : /*
195 : : * Transfer only parallel-safe subplans, leaving a NULL "hole" in the list
196 : : * for unsafe ones (so that the list indexes of the safe ones are
197 : : * preserved). This positively ensures that the worker won't try to run,
198 : : * or even do ExecInitNode on, an unsafe subplan. That's important to
199 : : * protect, eg, non-parallel-aware FDWs from getting into trouble.
200 : : */
3069 tgl@sss.pgh.pa.us 201 :CBC 360 : pstmt->subplans = NIL;
202 [ + + + + : 387 : foreach(lc, estate->es_plannedstmt->subplans)
+ + ]
203 : : {
204 : 27 : Plan *subplan = (Plan *) lfirst(lc);
205 : :
206 [ + - + + ]: 27 : if (subplan && !subplan->parallel_safe)
207 : 6 : subplan = NULL;
208 : 27 : pstmt->subplans = lappend(pstmt->subplans, subplan);
209 : : }
210 : :
3631 rhaas@postgresql.org 211 : 360 : pstmt->rewindPlanIDs = NULL;
212 : 360 : pstmt->rowMarks = NIL;
213 : 360 : pstmt->relationOids = NIL;
214 : 360 : pstmt->invalItems = NIL; /* workers can't replan anyway... */
2854 215 : 360 : pstmt->paramExecTypes = estate->es_plannedstmt->paramExecTypes;
3157 tgl@sss.pgh.pa.us 216 : 360 : pstmt->utilityStmt = NULL;
217 : 360 : pstmt->stmt_location = -1;
218 : 360 : pstmt->stmt_len = -1;
219 : :
220 : : /* Return serialized copy of our dummy PlannedStmt. */
3631 rhaas@postgresql.org 221 : 360 : return nodeToString(pstmt);
222 : : }
223 : :
224 : : /*
225 : : * Parallel-aware plan nodes (and occasionally others) may need some state
226 : : * which is shared across all parallel workers. Before we size the DSM, give
227 : : * them a chance to call shm_toc_estimate_chunk or shm_toc_estimate_keys on
228 : : * &pcxt->estimator.
229 : : *
230 : : * While we're at it, count the number of PlanState nodes in the tree, so
231 : : * we know how many Instrumentation structures we need.
232 : : */
233 : : static bool
234 : 1482 : ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e)
235 : : {
236 [ - + ]: 1482 : if (planstate == NULL)
3631 rhaas@postgresql.org 237 :UBC 0 : return false;
238 : :
239 : : /* Count this node. */
3631 rhaas@postgresql.org 240 :CBC 1482 : e->nnodes++;
241 : :
2930 242 [ + + + + : 1482 : switch (nodeTag(planstate))
- + - + +
+ + - + +
+ ]
243 : : {
244 : 573 : case T_SeqScanState:
245 [ + + ]: 573 : if (planstate->plan->parallel_aware)
3517 246 : 454 : ExecSeqScanEstimate((SeqScanState *) planstate,
247 : : e->pcxt);
2930 248 : 573 : break;
249 : 147 : case T_IndexScanState:
250 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
179 pg@bowt.ie 251 : 147 : ExecIndexScanEstimate((IndexScanState *) planstate,
252 : : e->pcxt);
2930 rhaas@postgresql.org 253 : 147 : break;
254 : 29 : case T_IndexOnlyScanState:
255 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
179 pg@bowt.ie 256 : 29 : ExecIndexOnlyScanEstimate((IndexOnlyScanState *) planstate,
257 : : e->pcxt);
258 : 29 : break;
259 : 10 : case T_BitmapIndexScanState:
260 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
261 : 10 : ExecBitmapIndexScanEstimate((BitmapIndexScanState *) planstate,
262 : : e->pcxt);
2930 rhaas@postgresql.org 263 : 10 : break;
2930 rhaas@postgresql.org 264 :UBC 0 : case T_ForeignScanState:
265 [ # # ]: 0 : if (planstate->plan->parallel_aware)
3503 266 : 0 : ExecForeignScanEstimate((ForeignScanState *) planstate,
267 : : e->pcxt);
2930 268 : 0 : break;
2832 rhaas@postgresql.org 269 :CBC 93 : case T_AppendState:
270 [ + + ]: 93 : if (planstate->plan->parallel_aware)
271 : 69 : ExecAppendEstimate((AppendState *) planstate,
272 : : e->pcxt);
273 : 93 : break;
2930 rhaas@postgresql.org 274 :UBC 0 : case T_CustomScanState:
275 [ # # ]: 0 : if (planstate->plan->parallel_aware)
3503 276 : 0 : ExecCustomScanEstimate((CustomScanState *) planstate,
277 : : e->pcxt);
2930 278 : 0 : break;
2930 rhaas@postgresql.org 279 :CBC 10 : case T_BitmapHeapScanState:
280 [ + + ]: 10 : if (planstate->plan->parallel_aware)
3104 281 : 9 : ExecBitmapHeapEstimate((BitmapHeapScanState *) planstate,
282 : : e->pcxt);
2930 283 : 10 : break;
2817 andres@anarazel.de 284 : 96 : case T_HashJoinState:
285 [ + + ]: 96 : if (planstate->plan->parallel_aware)
286 : 60 : ExecHashJoinEstimate((HashJoinState *) planstate,
287 : : e->pcxt);
288 : 96 : break;
2832 289 : 96 : case T_HashState:
290 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
291 : 96 : ExecHashEstimate((HashState *) planstate, e->pcxt);
292 : 96 : break;
2930 rhaas@postgresql.org 293 : 76 : case T_SortState:
294 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
295 : 76 : ExecSortEstimate((SortState *) planstate, e->pcxt);
2929 tgl@sss.pgh.pa.us 296 : 76 : break;
1979 tomas.vondra@postgre 297 :UBC 0 : case T_IncrementalSortState:
298 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
299 : 0 : ExecIncrementalSortEstimate((IncrementalSortState *) planstate, e->pcxt);
300 : 0 : break;
1905 drowley@postgresql.o 301 :CBC 278 : case T_AggState:
302 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
303 : 278 : ExecAggEstimate((AggState *) planstate, e->pcxt);
304 : 278 : break;
1515 305 : 3 : case T_MemoizeState:
306 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
307 : 3 : ExecMemoizeEstimate((MemoizeState *) planstate, e->pcxt);
1618 308 : 3 : break;
2930 rhaas@postgresql.org 309 : 71 : default:
310 : 71 : break;
311 : : }
312 : :
3631 313 : 1482 : return planstate_tree_walker(planstate, ExecParallelEstimate, e);
314 : : }
315 : :
316 : : /*
317 : : * Estimate the amount of space required to serialize the indicated parameters.
318 : : */
319 : : static Size
2851 320 : 12 : EstimateParamExecSpace(EState *estate, Bitmapset *params)
321 : : {
322 : : int paramid;
323 : 12 : Size sz = sizeof(int);
324 : :
325 : 12 : paramid = -1;
326 [ + + ]: 27 : while ((paramid = bms_next_member(params, paramid)) >= 0)
327 : : {
328 : : Oid typeOid;
329 : : int16 typLen;
330 : : bool typByVal;
331 : : ParamExecData *prm;
332 : :
333 : 15 : prm = &(estate->es_param_exec_vals[paramid]);
334 : 15 : typeOid = list_nth_oid(estate->es_plannedstmt->paramExecTypes,
335 : : paramid);
336 : :
337 : 15 : sz = add_size(sz, sizeof(int)); /* space for paramid */
338 : :
339 : : /* space for datum/isnull */
340 [ + - ]: 15 : if (OidIsValid(typeOid))
341 : 15 : get_typlenbyval(typeOid, &typLen, &typByVal);
342 : : else
343 : : {
344 : : /* If no type OID, assume by-value, like copyParamList does. */
2851 rhaas@postgresql.org 345 :UBC 0 : typLen = sizeof(Datum);
346 : 0 : typByVal = true;
347 : : }
2851 rhaas@postgresql.org 348 :CBC 15 : sz = add_size(sz,
349 : 15 : datumEstimateSpace(prm->value, prm->isnull,
350 : : typByVal, typLen));
351 : : }
352 : 12 : return sz;
353 : : }
354 : :
355 : : /*
356 : : * Serialize specified PARAM_EXEC parameters.
357 : : *
358 : : * We write the number of parameters first, as a 4-byte integer, and then
359 : : * write details for each parameter in turn. The details for each parameter
360 : : * consist of a 4-byte paramid (location of param in execution time internal
361 : : * parameter array) and then the datum as serialized by datumSerialize().
362 : : */
363 : : static dsa_pointer
2819 364 : 12 : SerializeParamExecParams(EState *estate, Bitmapset *params, dsa_area *area)
365 : : {
366 : : Size size;
367 : : int nparams;
368 : : int paramid;
369 : : ParamExecData *prm;
370 : : dsa_pointer handle;
371 : : char *start_address;
372 : :
373 : : /* Allocate enough space for the current parameter values. */
2851 374 : 12 : size = EstimateParamExecSpace(estate, params);
2819 375 : 12 : handle = dsa_allocate(area, size);
376 : 12 : start_address = dsa_get_address(area, handle);
377 : :
378 : : /* First write the number of parameters as a 4-byte integer. */
2851 379 : 12 : nparams = bms_num_members(params);
380 : 12 : memcpy(start_address, &nparams, sizeof(int));
381 : 12 : start_address += sizeof(int);
382 : :
383 : : /* Write details for each parameter in turn. */
384 : 12 : paramid = -1;
385 [ + + ]: 27 : while ((paramid = bms_next_member(params, paramid)) >= 0)
386 : : {
387 : : Oid typeOid;
388 : : int16 typLen;
389 : : bool typByVal;
390 : :
391 : 15 : prm = &(estate->es_param_exec_vals[paramid]);
392 : 15 : typeOid = list_nth_oid(estate->es_plannedstmt->paramExecTypes,
393 : : paramid);
394 : :
395 : : /* Write paramid. */
396 : 15 : memcpy(start_address, ¶mid, sizeof(int));
397 : 15 : start_address += sizeof(int);
398 : :
399 : : /* Write datum/isnull */
400 [ + - ]: 15 : if (OidIsValid(typeOid))
401 : 15 : get_typlenbyval(typeOid, &typLen, &typByVal);
402 : : else
403 : : {
404 : : /* If no type OID, assume by-value, like copyParamList does. */
2851 rhaas@postgresql.org 405 :UBC 0 : typLen = sizeof(Datum);
406 : 0 : typByVal = true;
407 : : }
2851 rhaas@postgresql.org 408 :CBC 15 : datumSerialize(prm->value, prm->isnull, typByVal, typLen,
409 : : &start_address);
410 : : }
411 : :
412 : 12 : return handle;
413 : : }
414 : :
415 : : /*
416 : : * Restore specified PARAM_EXEC parameters.
417 : : */
418 : : static void
419 : 36 : RestoreParamExecParams(char *start_address, EState *estate)
420 : : {
421 : : int nparams;
422 : : int i;
423 : : int paramid;
424 : :
425 : 36 : memcpy(&nparams, start_address, sizeof(int));
426 : 36 : start_address += sizeof(int);
427 : :
428 [ + + ]: 78 : for (i = 0; i < nparams; i++)
429 : : {
430 : : ParamExecData *prm;
431 : :
432 : : /* Read paramid */
433 : 42 : memcpy(¶mid, start_address, sizeof(int));
434 : 42 : start_address += sizeof(int);
435 : 42 : prm = &(estate->es_param_exec_vals[paramid]);
436 : :
437 : : /* Read datum/isnull. */
438 : 42 : prm->value = datumRestore(&start_address, &prm->isnull);
439 : 42 : prm->execPlan = NULL;
440 : : }
441 : 36 : }
442 : :
443 : : /*
444 : : * Initialize the dynamic shared memory segment that will be used to control
445 : : * parallel execution.
446 : : */
447 : : static bool
3631 448 : 1482 : ExecParallelInitializeDSM(PlanState *planstate,
449 : : ExecParallelInitializeDSMContext *d)
450 : : {
451 [ - + ]: 1482 : if (planstate == NULL)
3631 rhaas@postgresql.org 452 :UBC 0 : return false;
453 : :
454 : : /* If instrumentation is enabled, initialize slot for this node. */
3631 rhaas@postgresql.org 455 [ + + ]:CBC 1482 : if (d->instrumentation != NULL)
3559 456 : 513 : d->instrumentation->plan_node_id[d->nnodes] =
457 : 513 : planstate->plan->plan_node_id;
458 : :
459 : : /* Count this node. */
3631 460 : 1482 : d->nnodes++;
461 : :
462 : : /*
463 : : * Call initializers for DSM-using plan nodes.
464 : : *
465 : : * Most plan nodes won't do anything here, but plan nodes that allocated
466 : : * DSM may need to initialize shared state in the DSM before parallel
467 : : * workers are launched. They can allocate the space they previously
468 : : * estimated using shm_toc_allocate, and add the keys they previously
469 : : * estimated using shm_toc_insert, in each case targeting pcxt->toc.
470 : : */
2930 471 [ + + + + : 1482 : switch (nodeTag(planstate))
- + - + +
+ + - + +
+ ]
472 : : {
473 : 573 : case T_SeqScanState:
474 [ + + ]: 573 : if (planstate->plan->parallel_aware)
3517 475 : 454 : ExecSeqScanInitializeDSM((SeqScanState *) planstate,
476 : : d->pcxt);
2930 477 : 573 : break;
478 : 147 : case T_IndexScanState:
479 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
179 pg@bowt.ie 480 : 147 : ExecIndexScanInitializeDSM((IndexScanState *) planstate, d->pcxt);
2930 rhaas@postgresql.org 481 : 147 : break;
482 : 29 : case T_IndexOnlyScanState:
483 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
179 pg@bowt.ie 484 : 29 : ExecIndexOnlyScanInitializeDSM((IndexOnlyScanState *) planstate,
485 : : d->pcxt);
486 : 29 : break;
487 : 10 : case T_BitmapIndexScanState:
488 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
489 : 10 : ExecBitmapIndexScanInitializeDSM((BitmapIndexScanState *) planstate, d->pcxt);
2930 rhaas@postgresql.org 490 : 10 : break;
2930 rhaas@postgresql.org 491 :UBC 0 : case T_ForeignScanState:
492 [ # # ]: 0 : if (planstate->plan->parallel_aware)
3503 493 : 0 : ExecForeignScanInitializeDSM((ForeignScanState *) planstate,
494 : : d->pcxt);
2930 495 : 0 : break;
2832 rhaas@postgresql.org 496 :CBC 93 : case T_AppendState:
497 [ + + ]: 93 : if (planstate->plan->parallel_aware)
498 : 69 : ExecAppendInitializeDSM((AppendState *) planstate,
499 : : d->pcxt);
500 : 93 : break;
2930 rhaas@postgresql.org 501 :UBC 0 : case T_CustomScanState:
502 [ # # ]: 0 : if (planstate->plan->parallel_aware)
3503 503 : 0 : ExecCustomScanInitializeDSM((CustomScanState *) planstate,
504 : : d->pcxt);
2930 505 : 0 : break;
2930 rhaas@postgresql.org 506 :CBC 10 : case T_BitmapHeapScanState:
507 [ + + ]: 10 : if (planstate->plan->parallel_aware)
3104 508 : 9 : ExecBitmapHeapInitializeDSM((BitmapHeapScanState *) planstate,
509 : : d->pcxt);
2930 510 : 10 : break;
2817 andres@anarazel.de 511 : 96 : case T_HashJoinState:
512 [ + + ]: 96 : if (planstate->plan->parallel_aware)
513 : 60 : ExecHashJoinInitializeDSM((HashJoinState *) planstate,
514 : : d->pcxt);
515 : 96 : break;
2832 516 : 96 : case T_HashState:
517 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
518 : 96 : ExecHashInitializeDSM((HashState *) planstate, d->pcxt);
519 : 96 : break;
2930 rhaas@postgresql.org 520 : 76 : case T_SortState:
521 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
522 : 76 : ExecSortInitializeDSM((SortState *) planstate, d->pcxt);
2929 tgl@sss.pgh.pa.us 523 : 76 : break;
1979 tomas.vondra@postgre 524 :UBC 0 : case T_IncrementalSortState:
525 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
526 : 0 : ExecIncrementalSortInitializeDSM((IncrementalSortState *) planstate, d->pcxt);
527 : 0 : break;
1905 drowley@postgresql.o 528 :CBC 278 : case T_AggState:
529 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
530 : 278 : ExecAggInitializeDSM((AggState *) planstate, d->pcxt);
531 : 278 : break;
1515 532 : 3 : case T_MemoizeState:
533 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
534 : 3 : ExecMemoizeInitializeDSM((MemoizeState *) planstate, d->pcxt);
1618 535 : 3 : break;
2930 rhaas@postgresql.org 536 : 71 : default:
537 : 71 : break;
538 : : }
539 : :
3631 540 : 1482 : return planstate_tree_walker(planstate, ExecParallelInitializeDSM, d);
541 : : }
542 : :
543 : : /*
544 : : * It sets up the response queues for backend workers to return tuples
545 : : * to the main backend and start the workers.
546 : : */
547 : : static shm_mq_handle **
3599 548 : 489 : ExecParallelSetupTupleQueues(ParallelContext *pcxt, bool reinitialize)
549 : : {
550 : : shm_mq_handle **responseq;
551 : : char *tqueuespace;
552 : : int i;
553 : :
554 : : /* Skip this if no workers. */
3631 555 [ - + ]: 489 : if (pcxt->nworkers == 0)
3631 rhaas@postgresql.org 556 :UBC 0 : return NULL;
557 : :
558 : : /* Allocate memory for shared memory queue handles. */
559 : : responseq = (shm_mq_handle **)
3631 rhaas@postgresql.org 560 :CBC 489 : palloc(pcxt->nworkers * sizeof(shm_mq_handle *));
561 : :
562 : : /*
563 : : * If not reinitializing, allocate space from the DSM for the queues;
564 : : * otherwise, find the already allocated space.
565 : : */
3599 566 [ + + ]: 489 : if (!reinitialize)
567 : : tqueuespace =
568 : 360 : shm_toc_allocate(pcxt->toc,
569 : : mul_size(PARALLEL_TUPLE_QUEUE_SIZE,
3410 570 : 360 : pcxt->nworkers));
571 : : else
3015 tgl@sss.pgh.pa.us 572 : 129 : tqueuespace = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE, false);
573 : :
574 : : /* Create the queues, and become the receiver for each. */
3631 rhaas@postgresql.org 575 [ + + ]: 1796 : for (i = 0; i < pcxt->nworkers; ++i)
576 : : {
577 : : shm_mq *mq;
578 : :
3410 579 : 1307 : mq = shm_mq_create(tqueuespace +
580 : 1307 : ((Size) i) * PARALLEL_TUPLE_QUEUE_SIZE,
581 : : (Size) PARALLEL_TUPLE_QUEUE_SIZE);
582 : :
3631 583 : 1307 : shm_mq_set_receiver(mq, MyProc);
584 : 1307 : responseq[i] = shm_mq_attach(mq, pcxt->seg, NULL);
585 : : }
586 : :
587 : : /* Add array of queues to shm_toc, so others can find it. */
3599 588 [ + + ]: 489 : if (!reinitialize)
589 : 360 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE, tqueuespace);
590 : :
591 : : /* Return array of handles. */
3631 592 : 489 : return responseq;
593 : : }
594 : :
595 : : /*
596 : : * Sets up the required infrastructure for backend workers to perform
597 : : * execution and return results to the main backend.
598 : : */
599 : : ParallelExecutorInfo *
2851 600 : 360 : ExecInitParallelPlan(PlanState *planstate, EState *estate,
601 : : Bitmapset *sendParams, int nworkers,
602 : : int64 tuples_needed)
603 : : {
604 : : ParallelExecutorInfo *pei;
605 : : ParallelContext *pcxt;
606 : : ExecParallelEstimateContext e;
607 : : ExecParallelInitializeDSMContext d;
608 : : FixedParallelExecutorState *fpes;
609 : : char *pstmt_data;
610 : : char *pstmt_space;
611 : : char *paramlistinfo_space;
612 : : BufferUsage *bufusage_space;
613 : : WalUsage *walusage_space;
3631 614 : 360 : SharedExecutorInstrumentation *instrumentation = NULL;
2538 andres@anarazel.de 615 : 360 : SharedJitInstrumentation *jit_instrumentation = NULL;
616 : : int pstmt_len;
617 : : int paramlistinfo_len;
3631 rhaas@postgresql.org 618 : 360 : int instrumentation_len = 0;
2538 andres@anarazel.de 619 : 360 : int jit_instrumentation_len = 0;
3559 rhaas@postgresql.org 620 : 360 : int instrument_offset = 0;
3183 621 : 360 : Size dsa_minsize = dsa_minimum_size();
622 : : char *query_string;
623 : : int query_len;
624 : :
625 : : /*
626 : : * Force any initplan outputs that we're going to pass to workers to be
627 : : * evaluated, if they weren't already.
628 : : *
629 : : * For simplicity, we use the EState's per-output-tuple ExprContext here.
630 : : * That risks intra-query memory leakage, since we might pass through here
631 : : * many times before that ExprContext gets reset; but ExecSetParamPlan
632 : : * doesn't normally leak any memory in the context (see its comments), so
633 : : * it doesn't seem worth complicating this function's API to pass it a
634 : : * shorter-lived ExprContext. This might need to change someday.
635 : : */
2548 tgl@sss.pgh.pa.us 636 [ + + ]: 360 : ExecSetParamPlanMulti(sendParams, GetPerTupleExprContext(estate));
637 : :
638 : : /* Allocate object for return value. */
3631 rhaas@postgresql.org 639 : 360 : pei = palloc0(sizeof(ParallelExecutorInfo));
3580 640 : 360 : pei->finished = false;
3631 641 : 360 : pei->planstate = planstate;
642 : :
643 : : /* Fix up and serialize plan to be sent to workers. */
3629 644 : 360 : pstmt_data = ExecSerializePlan(planstate->plan, estate);
645 : :
646 : : /* Create a parallel context. */
2367 tmunro@postgresql.or 647 : 360 : pcxt = CreateParallelContext("postgres", "ParallelQueryMain", nworkers);
3631 rhaas@postgresql.org 648 : 360 : pei->pcxt = pcxt;
649 : :
650 : : /*
651 : : * Before telling the parallel context to create a dynamic shared memory
652 : : * segment, we need to figure out how big it should be. Estimate space
653 : : * for the various things we need to store.
654 : : */
655 : :
656 : : /* Estimate space for fixed-size state. */
2930 657 : 360 : shm_toc_estimate_chunk(&pcxt->estimator,
658 : : sizeof(FixedParallelExecutorState));
659 : 360 : shm_toc_estimate_keys(&pcxt->estimator, 1);
660 : :
661 : : /* Estimate space for query text. */
1605 tgl@sss.pgh.pa.us 662 : 360 : query_len = strlen(estate->es_sourceText);
2817 rhaas@postgresql.org 663 : 360 : shm_toc_estimate_chunk(&pcxt->estimator, query_len + 1);
3118 664 : 360 : shm_toc_estimate_keys(&pcxt->estimator, 1);
665 : :
666 : : /* Estimate space for serialized PlannedStmt. */
3631 667 : 360 : pstmt_len = strlen(pstmt_data) + 1;
668 : 360 : shm_toc_estimate_chunk(&pcxt->estimator, pstmt_len);
669 : 360 : shm_toc_estimate_keys(&pcxt->estimator, 1);
670 : :
671 : : /* Estimate space for serialized ParamListInfo. */
2851 672 : 360 : paramlistinfo_len = EstimateParamListSpace(estate->es_param_list_info);
673 : 360 : shm_toc_estimate_chunk(&pcxt->estimator, paramlistinfo_len);
3631 674 : 360 : shm_toc_estimate_keys(&pcxt->estimator, 1);
675 : :
676 : : /*
677 : : * Estimate space for BufferUsage.
678 : : *
679 : : * If EXPLAIN is not in use and there are no extensions loaded that care,
680 : : * we could skip this. But we have no way of knowing whether anyone's
681 : : * looking at pgBufferUsage, so do it unconditionally.
682 : : */
683 : 360 : shm_toc_estimate_chunk(&pcxt->estimator,
684 : : mul_size(sizeof(BufferUsage), pcxt->nworkers));
685 : 360 : shm_toc_estimate_keys(&pcxt->estimator, 1);
686 : :
687 : : /*
688 : : * Same thing for WalUsage.
689 : : */
1981 akapila@postgresql.o 690 : 360 : shm_toc_estimate_chunk(&pcxt->estimator,
691 : : mul_size(sizeof(WalUsage), pcxt->nworkers));
692 : 360 : shm_toc_estimate_keys(&pcxt->estimator, 1);
693 : :
694 : : /* Estimate space for tuple queues. */
3631 rhaas@postgresql.org 695 : 360 : shm_toc_estimate_chunk(&pcxt->estimator,
696 : : mul_size(PARALLEL_TUPLE_QUEUE_SIZE, pcxt->nworkers));
697 : 360 : shm_toc_estimate_keys(&pcxt->estimator, 1);
698 : :
699 : : /*
700 : : * Give parallel-aware nodes a chance to add to the estimates, and get a
701 : : * count of how many PlanState nodes there are.
702 : : */
703 : 360 : e.pcxt = pcxt;
704 : 360 : e.nnodes = 0;
705 : 360 : ExecParallelEstimate(planstate, &e);
706 : :
707 : : /* Estimate space for instrumentation, if required. */
708 [ + + ]: 360 : if (estate->es_instrument)
709 : : {
710 : 90 : instrumentation_len =
711 : : offsetof(SharedExecutorInstrumentation, plan_node_id) +
3413 712 : 90 : sizeof(int) * e.nnodes;
3559 713 : 90 : instrumentation_len = MAXALIGN(instrumentation_len);
714 : 90 : instrument_offset = instrumentation_len;
3410 715 : 90 : instrumentation_len +=
716 : 90 : mul_size(sizeof(Instrumentation),
717 : 90 : mul_size(e.nnodes, nworkers));
3631 718 : 90 : shm_toc_estimate_chunk(&pcxt->estimator, instrumentation_len);
719 : 90 : shm_toc_estimate_keys(&pcxt->estimator, 1);
720 : :
721 : : /* Estimate space for JIT instrumentation, if required. */
2538 andres@anarazel.de 722 [ + + ]: 90 : if (estate->es_jit_flags != PGJIT_NONE)
723 : : {
724 : 12 : jit_instrumentation_len =
725 : 12 : offsetof(SharedJitInstrumentation, jit_instr) +
726 : : sizeof(JitInstrumentation) * nworkers;
727 : 12 : shm_toc_estimate_chunk(&pcxt->estimator, jit_instrumentation_len);
728 : 12 : shm_toc_estimate_keys(&pcxt->estimator, 1);
729 : : }
730 : : }
731 : :
732 : : /* Estimate space for DSA area. */
3183 rhaas@postgresql.org 733 : 360 : shm_toc_estimate_chunk(&pcxt->estimator, dsa_minsize);
734 : 360 : shm_toc_estimate_keys(&pcxt->estimator, 1);
735 : :
736 : : /*
737 : : * InitializeParallelDSM() passes the active snapshot to the parallel
738 : : * worker, which uses it to set es_snapshot. Make sure we don't set
739 : : * es_snapshot differently in the child.
740 : : */
541 heikki.linnakangas@i 741 [ - + ]: 360 : Assert(GetActiveSnapshot() == estate->es_snapshot);
742 : :
743 : : /* Everyone's had a chance to ask for space, so now create the DSM. */
3631 rhaas@postgresql.org 744 : 360 : InitializeParallelDSM(pcxt);
745 : :
746 : : /*
747 : : * OK, now we have a dynamic shared memory segment, and it should be big
748 : : * enough to store all of the data we estimated we would want to put into
749 : : * it, plus whatever general stuff (not specifically executor-related) the
750 : : * ParallelContext itself needs to store there. None of the space we
751 : : * asked for has been allocated or initialized yet, though, so do that.
752 : : */
753 : :
754 : : /* Store fixed-size state. */
2930 755 : 360 : fpes = shm_toc_allocate(pcxt->toc, sizeof(FixedParallelExecutorState));
756 : 360 : fpes->tuples_needed = tuples_needed;
2851 757 : 360 : fpes->param_exec = InvalidDsaPointer;
2847 758 : 360 : fpes->eflags = estate->es_top_eflags;
2725 andres@anarazel.de 759 : 360 : fpes->jit_flags = estate->es_jit_flags;
2930 rhaas@postgresql.org 760 : 360 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_EXECUTOR_FIXED, fpes);
761 : :
762 : : /* Store query string */
2817 763 : 360 : query_string = shm_toc_allocate(pcxt->toc, query_len + 1);
1605 tgl@sss.pgh.pa.us 764 : 360 : memcpy(query_string, estate->es_sourceText, query_len + 1);
3118 rhaas@postgresql.org 765 : 360 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_QUERY_TEXT, query_string);
766 : :
767 : : /* Store serialized PlannedStmt. */
3631 768 : 360 : pstmt_space = shm_toc_allocate(pcxt->toc, pstmt_len);
769 : 360 : memcpy(pstmt_space, pstmt_data, pstmt_len);
770 : 360 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_PLANNEDSTMT, pstmt_space);
771 : :
772 : : /* Store serialized ParamListInfo. */
2851 773 : 360 : paramlistinfo_space = shm_toc_allocate(pcxt->toc, paramlistinfo_len);
774 : 360 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_PARAMLISTINFO, paramlistinfo_space);
775 : 360 : SerializeParamList(estate->es_param_list_info, ¶mlistinfo_space);
776 : :
777 : : /* Allocate space for each worker's BufferUsage; no need to initialize. */
3631 778 : 360 : bufusage_space = shm_toc_allocate(pcxt->toc,
2999 tgl@sss.pgh.pa.us 779 : 360 : mul_size(sizeof(BufferUsage), pcxt->nworkers));
3631 rhaas@postgresql.org 780 : 360 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_BUFFER_USAGE, bufusage_space);
781 : 360 : pei->buffer_usage = bufusage_space;
782 : :
783 : : /* Same for WalUsage. */
1981 akapila@postgresql.o 784 : 360 : walusage_space = shm_toc_allocate(pcxt->toc,
785 : 360 : mul_size(sizeof(WalUsage), pcxt->nworkers));
786 : 360 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_WAL_USAGE, walusage_space);
787 : 360 : pei->wal_usage = walusage_space;
788 : :
789 : : /* Set up the tuple queues that the workers will write into. */
3599 rhaas@postgresql.org 790 : 360 : pei->tqueue = ExecParallelSetupTupleQueues(pcxt, false);
791 : :
792 : : /* We don't need the TupleQueueReaders yet, though. */
2927 tgl@sss.pgh.pa.us 793 : 360 : pei->reader = NULL;
794 : :
795 : : /*
796 : : * If instrumentation options were supplied, allocate space for the data.
797 : : * It only gets partially initialized here; the rest happens during
798 : : * ExecParallelInitializeDSM.
799 : : */
3631 rhaas@postgresql.org 800 [ + + ]: 360 : if (estate->es_instrument)
801 : : {
802 : : Instrumentation *instrument;
803 : : int i;
804 : :
805 : 90 : instrumentation = shm_toc_allocate(pcxt->toc, instrumentation_len);
806 : 90 : instrumentation->instrument_options = estate->es_instrument;
3559 807 : 90 : instrumentation->instrument_offset = instrument_offset;
808 : 90 : instrumentation->num_workers = nworkers;
809 : 90 : instrumentation->num_plan_nodes = e.nnodes;
810 : 90 : instrument = GetInstrumentationArray(instrumentation);
811 [ + + ]: 930 : for (i = 0; i < nworkers * e.nnodes; ++i)
812 : 840 : InstrInit(&instrument[i], estate->es_instrument);
3631 813 : 90 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_INSTRUMENTATION,
814 : : instrumentation);
815 : 90 : pei->instrumentation = instrumentation;
816 : :
2538 andres@anarazel.de 817 [ + + ]: 90 : if (estate->es_jit_flags != PGJIT_NONE)
818 : : {
819 : 12 : jit_instrumentation = shm_toc_allocate(pcxt->toc,
820 : : jit_instrumentation_len);
821 : 12 : jit_instrumentation->num_workers = nworkers;
822 : 12 : memset(jit_instrumentation->jit_instr, 0,
823 : : sizeof(JitInstrumentation) * nworkers);
824 : 12 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_JIT_INSTRUMENTATION,
825 : : jit_instrumentation);
826 : 12 : pei->jit_instrumentation = jit_instrumentation;
827 : : }
828 : : }
829 : :
830 : : /*
831 : : * Create a DSA area that can be used by the leader and all workers.
832 : : * (However, if we failed to create a DSM and are using private memory
833 : : * instead, then skip this.)
834 : : */
3183 rhaas@postgresql.org 835 [ + - ]: 360 : if (pcxt->seg != NULL)
836 : : {
837 : : char *area_space;
838 : :
839 : 360 : area_space = shm_toc_allocate(pcxt->toc, dsa_minsize);
840 : 360 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_DSA, area_space);
841 : 360 : pei->area = dsa_create_in_place(area_space, dsa_minsize,
842 : : LWTRANCHE_PARALLEL_QUERY_DSA,
843 : : pcxt->seg);
844 : :
845 : : /*
846 : : * Serialize parameters, if any, using DSA storage. We don't dare use
847 : : * the main parallel query DSM for this because we might relaunch
848 : : * workers after the values have changed (and thus the amount of
849 : : * storage required has changed).
850 : : */
2851 851 [ + + ]: 360 : if (!bms_is_empty(sendParams))
852 : : {
2819 853 : 12 : pei->param_exec = SerializeParamExecParams(estate, sendParams,
854 : : pei->area);
2851 855 : 12 : fpes->param_exec = pei->param_exec;
856 : : }
857 : : }
858 : :
859 : : /*
860 : : * Give parallel-aware nodes a chance to initialize their shared data.
861 : : * This also initializes the elements of instrumentation->ps_instrument,
862 : : * if it exists.
863 : : */
3631 864 : 360 : d.pcxt = pcxt;
865 : 360 : d.instrumentation = instrumentation;
866 : 360 : d.nnodes = 0;
867 : :
868 : : /* Install our DSA area while initializing the plan. */
2819 869 : 360 : estate->es_query_dsa = pei->area;
3631 870 : 360 : ExecParallelInitializeDSM(planstate, &d);
2819 871 : 360 : estate->es_query_dsa = NULL;
872 : :
873 : : /*
874 : : * Make sure that the world hasn't shifted under our feet. This could
875 : : * probably just be an Assert(), but let's be conservative for now.
876 : : */
3631 877 [ - + ]: 360 : if (e.nnodes != d.nnodes)
3631 rhaas@postgresql.org 878 [ # # ]:UBC 0 : elog(ERROR, "inconsistent count of PlanState nodes");
879 : :
880 : : /* OK, we're ready to rock and roll. */
3631 rhaas@postgresql.org 881 :CBC 360 : return pei;
882 : : }
883 : :
884 : : /*
885 : : * Set up tuple queue readers to read the results of a parallel subplan.
886 : : *
887 : : * This is separate from ExecInitParallelPlan() because we can launch the
888 : : * worker processes and let them start doing something before we do this.
889 : : */
890 : : void
2914 andres@anarazel.de 891 : 480 : ExecParallelCreateReaders(ParallelExecutorInfo *pei)
892 : : {
2927 tgl@sss.pgh.pa.us 893 : 480 : int nworkers = pei->pcxt->nworkers_launched;
894 : : int i;
895 : :
896 [ - + ]: 480 : Assert(pei->reader == NULL);
897 : :
898 [ + - ]: 480 : if (nworkers > 0)
899 : : {
900 : 480 : pei->reader = (TupleQueueReader **)
901 : 480 : palloc(nworkers * sizeof(TupleQueueReader *));
902 : :
903 [ + + ]: 1747 : for (i = 0; i < nworkers; i++)
904 : : {
905 : 1267 : shm_mq_set_handle(pei->tqueue[i],
906 : 1267 : pei->pcxt->worker[i].bgwhandle);
2914 andres@anarazel.de 907 : 1267 : pei->reader[i] = CreateTupleQueueReader(pei->tqueue[i]);
908 : : }
909 : : }
2927 tgl@sss.pgh.pa.us 910 : 480 : }
911 : :
912 : : /*
913 : : * Re-initialize the parallel executor shared memory state before launching
914 : : * a fresh batch of workers.
915 : : */
916 : : void
2929 917 : 129 : ExecParallelReinitialize(PlanState *planstate,
918 : : ParallelExecutorInfo *pei,
919 : : Bitmapset *sendParams)
920 : : {
2851 rhaas@postgresql.org 921 : 129 : EState *estate = planstate->state;
922 : : FixedParallelExecutorState *fpes;
923 : :
924 : : /* Old workers must already be shut down */
2929 tgl@sss.pgh.pa.us 925 [ - + ]: 129 : Assert(pei->finished);
926 : :
927 : : /*
928 : : * Force any initplan outputs that we're going to pass to workers to be
929 : : * evaluated, if they weren't already (see comments in
930 : : * ExecInitParallelPlan).
931 : : */
2548 932 [ + - ]: 129 : ExecSetParamPlanMulti(sendParams, GetPerTupleExprContext(estate));
933 : :
2929 934 : 129 : ReinitializeParallelDSM(pei->pcxt);
935 : 129 : pei->tqueue = ExecParallelSetupTupleQueues(pei->pcxt, true);
2927 936 : 129 : pei->reader = NULL;
2929 937 : 129 : pei->finished = false;
938 : :
2851 rhaas@postgresql.org 939 : 129 : fpes = shm_toc_lookup(pei->pcxt->toc, PARALLEL_KEY_EXECUTOR_FIXED, false);
940 : :
941 : : /* Free any serialized parameters from the last round. */
942 [ - + ]: 129 : if (DsaPointerIsValid(fpes->param_exec))
943 : : {
2819 rhaas@postgresql.org 944 :UBC 0 : dsa_free(pei->area, fpes->param_exec);
2851 945 : 0 : fpes->param_exec = InvalidDsaPointer;
946 : : }
947 : :
948 : : /* Serialize current parameter values if required. */
2851 rhaas@postgresql.org 949 [ - + ]:CBC 129 : if (!bms_is_empty(sendParams))
950 : : {
2819 rhaas@postgresql.org 951 :UBC 0 : pei->param_exec = SerializeParamExecParams(estate, sendParams,
952 : : pei->area);
2851 953 : 0 : fpes->param_exec = pei->param_exec;
954 : : }
955 : :
956 : : /* Traverse plan tree and let each child node reset associated state. */
2819 rhaas@postgresql.org 957 :CBC 129 : estate->es_query_dsa = pei->area;
2929 tgl@sss.pgh.pa.us 958 : 129 : ExecParallelReInitializeDSM(planstate, pei->pcxt);
2819 rhaas@postgresql.org 959 : 129 : estate->es_query_dsa = NULL;
2929 tgl@sss.pgh.pa.us 960 : 129 : }
961 : :
962 : : /*
963 : : * Traverse plan tree to reinitialize per-node dynamic shared memory state
964 : : */
965 : : static bool
966 : 333 : ExecParallelReInitializeDSM(PlanState *planstate,
967 : : ParallelContext *pcxt)
968 : : {
969 [ - + ]: 333 : if (planstate == NULL)
2929 tgl@sss.pgh.pa.us 970 :UBC 0 : return false;
971 : :
972 : : /*
973 : : * Call reinitializers for DSM-using plan nodes.
974 : : */
2929 tgl@sss.pgh.pa.us 975 [ + + + - :CBC 333 : switch (nodeTag(planstate))
- - + + +
+ ]
976 : : {
977 : 138 : case T_SeqScanState:
978 [ + + ]: 138 : if (planstate->plan->parallel_aware)
979 : 114 : ExecSeqScanReInitializeDSM((SeqScanState *) planstate,
980 : : pcxt);
981 : 138 : break;
982 : 6 : case T_IndexScanState:
983 [ + - ]: 6 : if (planstate->plan->parallel_aware)
984 : 6 : ExecIndexScanReInitializeDSM((IndexScanState *) planstate,
985 : : pcxt);
986 : 6 : break;
987 : 6 : case T_IndexOnlyScanState:
988 [ + - ]: 6 : if (planstate->plan->parallel_aware)
989 : 6 : ExecIndexOnlyScanReInitializeDSM((IndexOnlyScanState *) planstate,
990 : : pcxt);
991 : 6 : break;
2929 tgl@sss.pgh.pa.us 992 :UBC 0 : case T_ForeignScanState:
993 [ # # ]: 0 : if (planstate->plan->parallel_aware)
994 : 0 : ExecForeignScanReInitializeDSM((ForeignScanState *) planstate,
995 : : pcxt);
996 : 0 : break;
2832 rhaas@postgresql.org 997 : 0 : case T_AppendState:
998 [ # # ]: 0 : if (planstate->plan->parallel_aware)
999 : 0 : ExecAppendReInitializeDSM((AppendState *) planstate, pcxt);
1000 : 0 : break;
2929 tgl@sss.pgh.pa.us 1001 : 0 : case T_CustomScanState:
1002 [ # # ]: 0 : if (planstate->plan->parallel_aware)
1003 : 0 : ExecCustomScanReInitializeDSM((CustomScanState *) planstate,
1004 : : pcxt);
1005 : 0 : break;
2929 tgl@sss.pgh.pa.us 1006 :CBC 27 : case T_BitmapHeapScanState:
1007 [ + - ]: 27 : if (planstate->plan->parallel_aware)
1008 : 27 : ExecBitmapHeapReInitializeDSM((BitmapHeapScanState *) planstate,
1009 : : pcxt);
1010 : 27 : break;
2817 andres@anarazel.de 1011 : 48 : case T_HashJoinState:
1012 [ + + ]: 48 : if (planstate->plan->parallel_aware)
1013 : 24 : ExecHashJoinReInitializeDSM((HashJoinState *) planstate,
1014 : : pcxt);
1015 : 48 : break;
179 pg@bowt.ie 1016 : 90 : case T_BitmapIndexScanState:
1017 : : case T_HashState:
1018 : : case T_SortState:
1019 : : case T_IncrementalSortState:
1020 : : case T_MemoizeState:
1021 : : /* these nodes have DSM state, but no reinitialization is required */
2929 tgl@sss.pgh.pa.us 1022 : 90 : break;
1023 : :
1024 : 18 : default:
1025 : 18 : break;
1026 : : }
1027 : :
1028 : 333 : return planstate_tree_walker(planstate, ExecParallelReInitializeDSM, pcxt);
1029 : : }
1030 : :
1031 : : /*
1032 : : * Copy instrumentation information about this node and its descendants from
1033 : : * dynamic shared memory.
1034 : : */
1035 : : static bool
3631 rhaas@postgresql.org 1036 : 513 : ExecParallelRetrieveInstrumentation(PlanState *planstate,
1037 : : SharedExecutorInstrumentation *instrumentation)
1038 : : {
1039 : : Instrumentation *instrument;
1040 : : int i;
1041 : : int n;
1042 : : int ibytes;
3376 1043 : 513 : int plan_node_id = planstate->plan->plan_node_id;
1044 : : MemoryContext oldcontext;
1045 : :
1046 : : /* Find the instrumentation for this node. */
3559 1047 [ + - ]: 2319 : for (i = 0; i < instrumentation->num_plan_nodes; ++i)
1048 [ + + ]: 2319 : if (instrumentation->plan_node_id[i] == plan_node_id)
3631 1049 : 513 : break;
3559 1050 [ - + ]: 513 : if (i >= instrumentation->num_plan_nodes)
3631 rhaas@postgresql.org 1051 [ # # ]:UBC 0 : elog(ERROR, "plan node %d not found", plan_node_id);
1052 : :
1053 : : /* Accumulate the statistics from all workers. */
3559 rhaas@postgresql.org 1054 :CBC 513 : instrument = GetInstrumentationArray(instrumentation);
1055 : 513 : instrument += i * instrumentation->num_workers;
1056 [ + + ]: 1353 : for (n = 0; n < instrumentation->num_workers; ++n)
1057 : 840 : InstrAggNode(planstate->instrument, &instrument[n]);
1058 : :
1059 : : /*
1060 : : * Also store the per-worker detail.
1061 : : *
1062 : : * Worker instrumentation should be allocated in the same context as the
1063 : : * regular instrumentation information, which is the per-query context.
1064 : : * Switch into per-query memory context.
1065 : : */
2824 1066 : 513 : oldcontext = MemoryContextSwitchTo(planstate->state->es_query_cxt);
1067 : 513 : ibytes = mul_size(instrumentation->num_workers, sizeof(Instrumentation));
1068 : 513 : planstate->worker_instrument =
1069 : 513 : palloc(ibytes + offsetof(WorkerInstrumentation, instrument));
1070 : 513 : MemoryContextSwitchTo(oldcontext);
1071 : :
3559 1072 : 513 : planstate->worker_instrument->num_workers = instrumentation->num_workers;
2824 1073 : 513 : memcpy(&planstate->worker_instrument->instrument, instrument, ibytes);
1074 : :
1075 : : /* Perform any node-type-specific work that needs to be done. */
2832 andres@anarazel.de 1076 [ + - - + : 513 : switch (nodeTag(planstate))
- + + - -
+ ]
1077 : : {
179 pg@bowt.ie 1078 : 135 : case T_IndexScanState:
1079 : 135 : ExecIndexScanRetrieveInstrumentation((IndexScanState *) planstate);
1080 : 135 : break;
179 pg@bowt.ie 1081 :UBC 0 : case T_IndexOnlyScanState:
1082 : 0 : ExecIndexOnlyScanRetrieveInstrumentation((IndexOnlyScanState *) planstate);
1083 : 0 : break;
1084 : 0 : case T_BitmapIndexScanState:
1085 : 0 : ExecBitmapIndexScanRetrieveInstrumentation((BitmapIndexScanState *) planstate);
1086 : 0 : break;
2832 andres@anarazel.de 1087 :CBC 6 : case T_SortState:
1088 : 6 : ExecSortRetrieveInstrumentation((SortState *) planstate);
1089 : 6 : break;
1979 tomas.vondra@postgre 1090 :UBC 0 : case T_IncrementalSortState:
1091 : 0 : ExecIncrementalSortRetrieveInstrumentation((IncrementalSortState *) planstate);
1092 : 0 : break;
2832 andres@anarazel.de 1093 :CBC 42 : case T_HashState:
1094 : 42 : ExecHashRetrieveInstrumentation((HashState *) planstate);
1095 : 42 : break;
1905 drowley@postgresql.o 1096 : 51 : case T_AggState:
1097 : 51 : ExecAggRetrieveInstrumentation((AggState *) planstate);
1098 : 51 : break;
1515 drowley@postgresql.o 1099 :UBC 0 : case T_MemoizeState:
1100 : 0 : ExecMemoizeRetrieveInstrumentation((MemoizeState *) planstate);
1618 1101 : 0 : break;
424 1102 : 0 : case T_BitmapHeapScanState:
1103 : 0 : ExecBitmapHeapRetrieveInstrumentation((BitmapHeapScanState *) planstate);
1104 : 0 : break;
2832 andres@anarazel.de 1105 :CBC 279 : default:
1106 : 279 : break;
1107 : : }
1108 : :
3631 rhaas@postgresql.org 1109 : 513 : return planstate_tree_walker(planstate, ExecParallelRetrieveInstrumentation,
1110 : : instrumentation);
1111 : : }
1112 : :
1113 : : /*
1114 : : * Add up the workers' JIT instrumentation from dynamic shared memory.
1115 : : */
1116 : : static void
2538 andres@anarazel.de 1117 : 12 : ExecParallelRetrieveJitInstrumentation(PlanState *planstate,
1118 : : SharedJitInstrumentation *shared_jit)
1119 : : {
1120 : : JitInstrumentation *combined;
1121 : : int ibytes;
1122 : :
1123 : : int n;
1124 : :
1125 : : /*
1126 : : * Accumulate worker JIT instrumentation into the combined JIT
1127 : : * instrumentation, allocating it if required.
1128 : : */
2530 1129 [ + - ]: 12 : if (!planstate->state->es_jit_worker_instr)
1130 : 12 : planstate->state->es_jit_worker_instr =
2538 1131 : 12 : MemoryContextAllocZero(planstate->state->es_query_cxt, sizeof(JitInstrumentation));
2530 1132 : 12 : combined = planstate->state->es_jit_worker_instr;
1133 : :
1134 : : /* Accumulate all the workers' instrumentations. */
2538 1135 [ + + ]: 36 : for (n = 0; n < shared_jit->num_workers; ++n)
1136 : 24 : InstrJitAgg(combined, &shared_jit->jit_instr[n]);
1137 : :
1138 : : /*
1139 : : * Store the per-worker detail.
1140 : : *
1141 : : * Similar to ExecParallelRetrieveInstrumentation(), allocate the
1142 : : * instrumentation in per-query context.
1143 : : */
1144 : 12 : ibytes = offsetof(SharedJitInstrumentation, jit_instr)
2299 tgl@sss.pgh.pa.us 1145 : 12 : + mul_size(shared_jit->num_workers, sizeof(JitInstrumentation));
2538 andres@anarazel.de 1146 : 12 : planstate->worker_jit_instrument =
1147 : 12 : MemoryContextAlloc(planstate->state->es_query_cxt, ibytes);
1148 : :
1149 : 12 : memcpy(planstate->worker_jit_instrument, shared_jit, ibytes);
1150 : 12 : }
1151 : :
1152 : : /*
1153 : : * Finish parallel execution. We wait for parallel workers to finish, and
1154 : : * accumulate their buffer/WAL usage.
1155 : : */
1156 : : void
3631 rhaas@postgresql.org 1157 : 882 : ExecParallelFinish(ParallelExecutorInfo *pei)
1158 : : {
2927 tgl@sss.pgh.pa.us 1159 : 882 : int nworkers = pei->pcxt->nworkers_launched;
1160 : : int i;
1161 : :
1162 : : /* Make this be a no-op if called twice in a row. */
3580 rhaas@postgresql.org 1163 [ + + ]: 882 : if (pei->finished)
1164 : 399 : return;
1165 : :
1166 : : /*
1167 : : * Detach from tuple queues ASAP, so that any still-active workers will
1168 : : * notice that no further results are wanted.
1169 : : */
2927 tgl@sss.pgh.pa.us 1170 [ + - ]: 483 : if (pei->tqueue != NULL)
1171 : : {
1172 [ + + ]: 1744 : for (i = 0; i < nworkers; i++)
1173 : 1261 : shm_mq_detach(pei->tqueue[i]);
1174 : 483 : pfree(pei->tqueue);
1175 : 483 : pei->tqueue = NULL;
1176 : : }
1177 : :
1178 : : /*
1179 : : * While we're waiting for the workers to finish, let's get rid of the
1180 : : * tuple queue readers. (Any other local cleanup could be done here too.)
1181 : : */
1182 [ + + ]: 483 : if (pei->reader != NULL)
1183 : : {
1184 [ + + ]: 1735 : for (i = 0; i < nworkers; i++)
1185 : 1261 : DestroyTupleQueueReader(pei->reader[i]);
1186 : 474 : pfree(pei->reader);
1187 : 474 : pei->reader = NULL;
1188 : : }
1189 : :
1190 : : /* Now wait for the workers to finish. */
3631 rhaas@postgresql.org 1191 : 483 : WaitForParallelWorkersToFinish(pei->pcxt);
1192 : :
1193 : : /*
1194 : : * Next, accumulate buffer/WAL usage. (This must wait for the workers to
1195 : : * finish, or we might get incomplete data.)
1196 : : */
2927 tgl@sss.pgh.pa.us 1197 [ + + ]: 1744 : for (i = 0; i < nworkers; i++)
1981 akapila@postgresql.o 1198 : 1261 : InstrAccumParallelQuery(&pei->buffer_usage[i], &pei->wal_usage[i]);
1199 : :
3580 rhaas@postgresql.org 1200 : 483 : pei->finished = true;
1201 : : }
1202 : :
1203 : : /*
1204 : : * Accumulate instrumentation, and then clean up whatever ParallelExecutorInfo
1205 : : * resources still exist after ExecParallelFinish. We separate these
1206 : : * routines because someone might want to examine the contents of the DSM
1207 : : * after ExecParallelFinish and before calling this routine.
1208 : : */
1209 : : void
3613 1210 : 354 : ExecParallelCleanup(ParallelExecutorInfo *pei)
1211 : : {
1212 : : /* Accumulate instrumentation, if any. */
2818 1213 [ + + ]: 354 : if (pei->instrumentation)
1214 : 90 : ExecParallelRetrieveInstrumentation(pei->planstate,
1215 : : pei->instrumentation);
1216 : :
1217 : : /* Accumulate JIT instrumentation, if any. */
2538 andres@anarazel.de 1218 [ + + ]: 354 : if (pei->jit_instrumentation)
1219 : 12 : ExecParallelRetrieveJitInstrumentation(pei->planstate,
2299 tgl@sss.pgh.pa.us 1220 : 12 : pei->jit_instrumentation);
1221 : :
1222 : : /* Free any serialized parameters. */
2851 rhaas@postgresql.org 1223 [ + + ]: 354 : if (DsaPointerIsValid(pei->param_exec))
1224 : : {
1225 : 12 : dsa_free(pei->area, pei->param_exec);
1226 : 12 : pei->param_exec = InvalidDsaPointer;
1227 : : }
3183 1228 [ + - ]: 354 : if (pei->area != NULL)
1229 : : {
1230 : 354 : dsa_detach(pei->area);
1231 : 354 : pei->area = NULL;
1232 : : }
3613 1233 [ + - ]: 354 : if (pei->pcxt != NULL)
1234 : : {
1235 : 354 : DestroyParallelContext(pei->pcxt);
1236 : 354 : pei->pcxt = NULL;
1237 : : }
1238 : 354 : pfree(pei);
1239 : 354 : }
1240 : :
1241 : : /*
1242 : : * Create a DestReceiver to write tuples we produce to the shm_mq designated
1243 : : * for that purpose.
1244 : : */
1245 : : static DestReceiver *
3631 1246 : 1267 : ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc)
1247 : : {
1248 : : char *mqspace;
1249 : : shm_mq *mq;
1250 : :
3015 tgl@sss.pgh.pa.us 1251 : 1267 : mqspace = shm_toc_lookup(toc, PARALLEL_KEY_TUPLE_QUEUE, false);
3631 rhaas@postgresql.org 1252 : 1267 : mqspace += ParallelWorkerNumber * PARALLEL_TUPLE_QUEUE_SIZE;
1253 : 1267 : mq = (shm_mq *) mqspace;
1254 : 1267 : shm_mq_set_sender(mq, MyProc);
1255 : 1267 : return CreateTupleQueueDestReceiver(shm_mq_attach(mq, seg, NULL));
1256 : : }
1257 : :
1258 : : /*
1259 : : * Create a QueryDesc for the PlannedStmt we are to execute, and return it.
1260 : : */
1261 : : static QueryDesc *
1262 : 1267 : ExecParallelGetQueryDesc(shm_toc *toc, DestReceiver *receiver,
1263 : : int instrument_options)
1264 : : {
1265 : : char *pstmtspace;
1266 : : char *paramspace;
1267 : : PlannedStmt *pstmt;
1268 : : ParamListInfo paramLI;
1269 : : char *queryString;
1270 : :
1271 : : /* Get the query string from shared memory */
3015 tgl@sss.pgh.pa.us 1272 : 1267 : queryString = shm_toc_lookup(toc, PARALLEL_KEY_QUERY_TEXT, false);
1273 : :
1274 : : /* Reconstruct leader-supplied PlannedStmt. */
1275 : 1267 : pstmtspace = shm_toc_lookup(toc, PARALLEL_KEY_PLANNEDSTMT, false);
3631 rhaas@postgresql.org 1276 : 1267 : pstmt = (PlannedStmt *) stringToNode(pstmtspace);
1277 : :
1278 : : /* Reconstruct ParamListInfo. */
2851 1279 : 1267 : paramspace = shm_toc_lookup(toc, PARALLEL_KEY_PARAMLISTINFO, false);
3631 1280 : 1267 : paramLI = RestoreParamList(¶mspace);
1281 : :
1282 : : /* Create a QueryDesc for the query. */
1283 : 1267 : return CreateQueryDesc(pstmt,
1284 : : queryString,
1285 : : GetActiveSnapshot(), InvalidSnapshot,
1286 : : receiver, paramLI, NULL, instrument_options);
1287 : : }
1288 : :
1289 : : /*
1290 : : * Copy instrumentation information from this node and its descendants into
1291 : : * dynamic shared memory, so that the parallel leader can retrieve it.
1292 : : */
1293 : : static bool
1294 : 1188 : ExecParallelReportInstrumentation(PlanState *planstate,
1295 : : SharedExecutorInstrumentation *instrumentation)
1296 : : {
1297 : : int i;
3376 1298 : 1188 : int plan_node_id = planstate->plan->plan_node_id;
1299 : : Instrumentation *instrument;
1300 : :
3559 1301 : 1188 : InstrEndLoop(planstate->instrument);
1302 : :
1303 : : /*
1304 : : * If we shuffled the plan_node_id values in ps_instrument into sorted
1305 : : * order, we could use binary search here. This might matter someday if
1306 : : * we're pushing down sufficiently large plan trees. For now, do it the
1307 : : * slow, dumb way.
1308 : : */
1309 [ + - ]: 3906 : for (i = 0; i < instrumentation->num_plan_nodes; ++i)
1310 [ + + ]: 3906 : if (instrumentation->plan_node_id[i] == plan_node_id)
3631 1311 : 1188 : break;
3559 1312 [ - + ]: 1188 : if (i >= instrumentation->num_plan_nodes)
3631 rhaas@postgresql.org 1313 [ # # ]:UBC 0 : elog(ERROR, "plan node %d not found", plan_node_id);
1314 : :
1315 : : /*
1316 : : * Add our statistics to the per-node, per-worker totals. It's possible
1317 : : * that this could happen more than once if we relaunched workers.
1318 : : */
3559 rhaas@postgresql.org 1319 :CBC 1188 : instrument = GetInstrumentationArray(instrumentation);
1320 : 1188 : instrument += i * instrumentation->num_workers;
1321 [ - + ]: 1188 : Assert(IsParallelWorker());
1322 [ - + ]: 1188 : Assert(ParallelWorkerNumber < instrumentation->num_workers);
1323 : 1188 : InstrAggNode(&instrument[ParallelWorkerNumber], planstate->instrument);
1324 : :
3631 1325 : 1188 : return planstate_tree_walker(planstate, ExecParallelReportInstrumentation,
1326 : : instrumentation);
1327 : : }
1328 : :
1329 : : /*
1330 : : * Initialize the PlanState and its descendants with the information
1331 : : * retrieved from shared memory. This has to be done once the PlanState
1332 : : * is allocated and initialized by executor; that is, after ExecutorStart().
1333 : : */
1334 : : static bool
2851 andres@anarazel.de 1335 : 4084 : ExecParallelInitializeWorker(PlanState *planstate, ParallelWorkerContext *pwcxt)
1336 : : {
3587 rhaas@postgresql.org 1337 [ - + ]: 4084 : if (planstate == NULL)
3587 rhaas@postgresql.org 1338 :UBC 0 : return false;
1339 : :
2930 rhaas@postgresql.org 1340 [ + + + + :CBC 4084 : switch (nodeTag(planstate))
- + - + +
+ + - + +
+ ]
1341 : : {
1342 : 1661 : case T_SeqScanState:
1343 [ + + ]: 1661 : if (planstate->plan->parallel_aware)
2851 andres@anarazel.de 1344 : 1347 : ExecSeqScanInitializeWorker((SeqScanState *) planstate, pwcxt);
2930 rhaas@postgresql.org 1345 : 1661 : break;
1346 : 198 : case T_IndexScanState:
1347 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
179 pg@bowt.ie 1348 : 198 : ExecIndexScanInitializeWorker((IndexScanState *) planstate, pwcxt);
2930 rhaas@postgresql.org 1349 : 198 : break;
1350 : 121 : case T_IndexOnlyScanState:
1351 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
179 pg@bowt.ie 1352 : 121 : ExecIndexOnlyScanInitializeWorker((IndexOnlyScanState *) planstate,
1353 : : pwcxt);
1354 : 121 : break;
1355 : 136 : case T_BitmapIndexScanState:
1356 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
1357 : 136 : ExecBitmapIndexScanInitializeWorker((BitmapIndexScanState *) planstate,
1358 : : pwcxt);
2930 rhaas@postgresql.org 1359 : 136 : break;
2930 rhaas@postgresql.org 1360 :UBC 0 : case T_ForeignScanState:
1361 [ # # ]: 0 : if (planstate->plan->parallel_aware)
3503 1362 : 0 : ExecForeignScanInitializeWorker((ForeignScanState *) planstate,
1363 : : pwcxt);
2930 1364 : 0 : break;
2832 rhaas@postgresql.org 1365 :CBC 189 : case T_AppendState:
1366 [ + + ]: 189 : if (planstate->plan->parallel_aware)
1367 : 159 : ExecAppendInitializeWorker((AppendState *) planstate, pwcxt);
1368 : 189 : break;
2930 rhaas@postgresql.org 1369 :UBC 0 : case T_CustomScanState:
1370 [ # # ]: 0 : if (planstate->plan->parallel_aware)
3503 1371 : 0 : ExecCustomScanInitializeWorker((CustomScanState *) planstate,
1372 : : pwcxt);
2930 1373 : 0 : break;
2930 rhaas@postgresql.org 1374 :CBC 136 : case T_BitmapHeapScanState:
1375 [ + + ]: 136 : if (planstate->plan->parallel_aware)
2851 andres@anarazel.de 1376 : 135 : ExecBitmapHeapInitializeWorker((BitmapHeapScanState *) planstate,
1377 : : pwcxt);
2930 rhaas@postgresql.org 1378 : 136 : break;
2817 andres@anarazel.de 1379 : 273 : case T_HashJoinState:
1380 [ + + ]: 273 : if (planstate->plan->parallel_aware)
1381 : 153 : ExecHashJoinInitializeWorker((HashJoinState *) planstate,
1382 : : pwcxt);
1383 : 273 : break;
2832 1384 : 273 : case T_HashState:
1385 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
1386 : 273 : ExecHashInitializeWorker((HashState *) planstate, pwcxt);
1387 : 273 : break;
2930 rhaas@postgresql.org 1388 : 226 : case T_SortState:
1389 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
2851 andres@anarazel.de 1390 : 226 : ExecSortInitializeWorker((SortState *) planstate, pwcxt);
2929 tgl@sss.pgh.pa.us 1391 : 226 : break;
1979 tomas.vondra@postgre 1392 :UBC 0 : case T_IncrementalSortState:
1393 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
1394 : 0 : ExecIncrementalSortInitializeWorker((IncrementalSortState *) planstate,
1395 : : pwcxt);
1396 : 0 : break;
1905 drowley@postgresql.o 1397 :CBC 776 : case T_AggState:
1398 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
1399 : 776 : ExecAggInitializeWorker((AggState *) planstate, pwcxt);
1400 : 776 : break;
1515 1401 : 6 : case T_MemoizeState:
1402 : : /* even when not parallel-aware, for EXPLAIN ANALYZE */
1403 : 6 : ExecMemoizeInitializeWorker((MemoizeState *) planstate, pwcxt);
1618 1404 : 6 : break;
2930 rhaas@postgresql.org 1405 : 89 : default:
1406 : 89 : break;
1407 : : }
1408 : :
2851 andres@anarazel.de 1409 : 4084 : return planstate_tree_walker(planstate, ExecParallelInitializeWorker,
1410 : : pwcxt);
1411 : : }
1412 : :
1413 : : /*
1414 : : * Main entrypoint for parallel query worker processes.
1415 : : *
1416 : : * We reach this function from ParallelWorkerMain, so the setup necessary to
1417 : : * create a sensible parallel environment has already been done;
1418 : : * ParallelWorkerMain worries about stuff like the transaction state, combo
1419 : : * CID mappings, and GUC values, so we don't need to deal with any of that
1420 : : * here.
1421 : : *
1422 : : * Our job is to deal with concerns specific to the executor. The parallel
1423 : : * group leader will have stored a serialized PlannedStmt, and it's our job
1424 : : * to execute that plan and write the resulting tuples to the appropriate
1425 : : * tuple queue. Various bits of supporting information that we need in order
1426 : : * to do this are also stored in the dsm_segment and can be accessed through
1427 : : * the shm_toc.
1428 : : */
1429 : : void
3631 rhaas@postgresql.org 1430 : 1267 : ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
1431 : : {
1432 : : FixedParallelExecutorState *fpes;
1433 : : BufferUsage *buffer_usage;
1434 : : WalUsage *wal_usage;
1435 : : DestReceiver *receiver;
1436 : : QueryDesc *queryDesc;
1437 : : SharedExecutorInstrumentation *instrumentation;
1438 : : SharedJitInstrumentation *jit_instrumentation;
1439 : 1267 : int instrument_options = 0;
1440 : : void *area_space;
1441 : : dsa_area *area;
1442 : : ParallelWorkerContext pwcxt;
1443 : :
1444 : : /* Get fixed-size state. */
2930 1445 : 1267 : fpes = shm_toc_lookup(toc, PARALLEL_KEY_EXECUTOR_FIXED, false);
1446 : :
1447 : : /* Set up DestReceiver, SharedExecutorInstrumentation, and QueryDesc. */
3631 1448 : 1267 : receiver = ExecParallelGetReceiver(seg, toc);
3015 tgl@sss.pgh.pa.us 1449 : 1267 : instrumentation = shm_toc_lookup(toc, PARALLEL_KEY_INSTRUMENTATION, true);
3631 rhaas@postgresql.org 1450 [ + + ]: 1267 : if (instrumentation != NULL)
1451 : 363 : instrument_options = instrumentation->instrument_options;
2538 andres@anarazel.de 1452 : 1267 : jit_instrumentation = shm_toc_lookup(toc, PARALLEL_KEY_JIT_INSTRUMENTATION,
1453 : : true);
3631 rhaas@postgresql.org 1454 : 1267 : queryDesc = ExecParallelGetQueryDesc(toc, receiver, instrument_options);
1455 : :
1456 : : /* Setting debug_query_string for individual workers */
3118 1457 : 1267 : debug_query_string = queryDesc->sourceText;
1458 : :
1459 : : /* Report workers' query for monitoring purposes */
1460 : 1267 : pgstat_report_activity(STATE_RUNNING, debug_query_string);
1461 : :
1462 : : /* Attach to the dynamic shared memory area. */
3015 tgl@sss.pgh.pa.us 1463 : 1267 : area_space = shm_toc_lookup(toc, PARALLEL_KEY_DSA, false);
3183 rhaas@postgresql.org 1464 : 1267 : area = dsa_attach_in_place(area_space, seg);
1465 : :
1466 : : /* Start up the executor */
2725 andres@anarazel.de 1467 : 1267 : queryDesc->plannedstmt->jitFlags = fpes->jit_flags;
107 amitlan@postgresql.o 1468 : 1267 : ExecutorStart(queryDesc, fpes->eflags);
1469 : :
1470 : : /* Special executor initialization steps for parallel workers */
3183 rhaas@postgresql.org 1471 : 1267 : queryDesc->planstate->state->es_query_dsa = area;
2851 1472 [ + + ]: 1267 : if (DsaPointerIsValid(fpes->param_exec))
1473 : : {
1474 : : char *paramexec_space;
1475 : :
1476 : 36 : paramexec_space = dsa_get_address(area, fpes->param_exec);
1477 : 36 : RestoreParamExecParams(paramexec_space, queryDesc->estate);
1478 : : }
andres@anarazel.de 1479 : 1267 : pwcxt.toc = toc;
1480 : 1267 : pwcxt.seg = seg;
1481 : 1267 : ExecParallelInitializeWorker(queryDesc->planstate, &pwcxt);
1482 : :
1483 : : /* Pass down any tuple bound */
2930 rhaas@postgresql.org 1484 : 1267 : ExecSetTupleBound(fpes->tuples_needed, queryDesc->planstate);
1485 : :
1486 : : /*
1487 : : * Prepare to track buffer/WAL usage during query execution.
1488 : : *
1489 : : * We do this after starting up the executor to match what happens in the
1490 : : * leader, which also doesn't count buffer accesses and WAL activity that
1491 : : * occur during executor startup.
1492 : : */
2591 akapila@postgresql.o 1493 : 1267 : InstrStartParallelQuery();
1494 : :
1495 : : /*
1496 : : * Run the plan. If we specified a tuple bound, be careful not to demand
1497 : : * more tuples than that.
1498 : : */
2930 rhaas@postgresql.org 1499 : 1267 : ExecutorRun(queryDesc,
1500 : : ForwardScanDirection,
271 tgl@sss.pgh.pa.us 1501 : 1267 : fpes->tuples_needed < 0 ? (int64) 0 : fpes->tuples_needed);
1502 : :
1503 : : /* Shut down the executor */
3631 rhaas@postgresql.org 1504 : 1261 : ExecutorFinish(queryDesc);
1505 : :
1506 : : /* Report buffer/WAL usage during parallel execution. */
3015 tgl@sss.pgh.pa.us 1507 : 1261 : buffer_usage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE, false);
1981 akapila@postgresql.o 1508 : 1261 : wal_usage = shm_toc_lookup(toc, PARALLEL_KEY_WAL_USAGE, false);
1509 : 1261 : InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber],
1510 : 1261 : &wal_usage[ParallelWorkerNumber]);
1511 : :
1512 : : /* Report instrumentation data if any instrumentation options are set. */
3631 rhaas@postgresql.org 1513 [ + + ]: 1261 : if (instrumentation != NULL)
1514 : 363 : ExecParallelReportInstrumentation(queryDesc->planstate,
1515 : : instrumentation);
1516 : :
1517 : : /* Report JIT instrumentation data if any */
2538 andres@anarazel.de 1518 [ - + - - ]: 1261 : if (queryDesc->estate->es_jit && jit_instrumentation != NULL)
1519 : : {
2538 andres@anarazel.de 1520 [ # # ]:UBC 0 : Assert(ParallelWorkerNumber < jit_instrumentation->num_workers);
1521 : 0 : jit_instrumentation->jit_instr[ParallelWorkerNumber] =
1522 : 0 : queryDesc->estate->es_jit->instr;
1523 : : }
1524 : :
1525 : : /* Must do this after capturing instrumentation. */
3629 rhaas@postgresql.org 1526 :CBC 1261 : ExecutorEnd(queryDesc);
1527 : :
1528 : : /* Cleanup. */
3183 1529 : 1261 : dsa_detach(area);
3631 1530 : 1261 : FreeQueryDesc(queryDesc);
2921 peter_e@gmx.net 1531 : 1261 : receiver->rDestroy(receiver);
3631 rhaas@postgresql.org 1532 : 1261 : }
|