Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * parallel.c
4 : : * Infrastructure for launching parallel workers
5 : : *
6 : : * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
7 : : * Portions Copyright (c) 1994, Regents of the University of California
8 : : *
9 : : * IDENTIFICATION
10 : : * src/backend/access/transam/parallel.c
11 : : *
12 : : *-------------------------------------------------------------------------
13 : : */
14 : :
15 : : #include "postgres.h"
16 : :
17 : : #include "access/brin.h"
18 : : #include "access/gin.h"
19 : : #include "access/nbtree.h"
20 : : #include "access/parallel.h"
21 : : #include "access/session.h"
22 : : #include "access/xact.h"
23 : : #include "access/xlog.h"
24 : : #include "catalog/index.h"
25 : : #include "catalog/namespace.h"
26 : : #include "catalog/pg_enum.h"
27 : : #include "catalog/storage.h"
28 : : #include "commands/async.h"
29 : : #include "commands/vacuum.h"
30 : : #include "executor/execParallel.h"
31 : : #include "libpq/libpq.h"
32 : : #include "libpq/pqformat.h"
33 : : #include "libpq/pqmq.h"
34 : : #include "miscadmin.h"
35 : : #include "optimizer/optimizer.h"
36 : : #include "pgstat.h"
37 : : #include "storage/ipc.h"
38 : : #include "storage/predicate.h"
39 : : #include "storage/spin.h"
40 : : #include "tcop/tcopprot.h"
41 : : #include "utils/combocid.h"
42 : : #include "utils/guc.h"
43 : : #include "utils/inval.h"
44 : : #include "utils/memutils.h"
45 : : #include "utils/relmapper.h"
46 : : #include "utils/snapmgr.h"
47 : :
48 : : /*
49 : : * We don't want to waste a lot of memory on an error queue which, most of
50 : : * the time, will process only a handful of small messages. However, it is
51 : : * desirable to make it large enough that a typical ErrorResponse can be sent
52 : : * without blocking. That way, a worker that errors out can write the whole
53 : : * message into the queue and terminate without waiting for the user backend.
54 : : */
55 : : #define PARALLEL_ERROR_QUEUE_SIZE 16384
56 : :
57 : : /* Magic number for parallel context TOC. */
58 : : #define PARALLEL_MAGIC 0x50477c7c
59 : :
60 : : /*
61 : : * Magic numbers for per-context parallel state sharing. Higher-level code
62 : : * should use smaller values, leaving these very large ones for use by this
63 : : * module.
64 : : */
65 : : #define PARALLEL_KEY_FIXED UINT64CONST(0xFFFFFFFFFFFF0001)
66 : : #define PARALLEL_KEY_ERROR_QUEUE UINT64CONST(0xFFFFFFFFFFFF0002)
67 : : #define PARALLEL_KEY_LIBRARY UINT64CONST(0xFFFFFFFFFFFF0003)
68 : : #define PARALLEL_KEY_GUC UINT64CONST(0xFFFFFFFFFFFF0004)
69 : : #define PARALLEL_KEY_COMBO_CID UINT64CONST(0xFFFFFFFFFFFF0005)
70 : : #define PARALLEL_KEY_TRANSACTION_SNAPSHOT UINT64CONST(0xFFFFFFFFFFFF0006)
71 : : #define PARALLEL_KEY_ACTIVE_SNAPSHOT UINT64CONST(0xFFFFFFFFFFFF0007)
72 : : #define PARALLEL_KEY_TRANSACTION_STATE UINT64CONST(0xFFFFFFFFFFFF0008)
73 : : #define PARALLEL_KEY_ENTRYPOINT UINT64CONST(0xFFFFFFFFFFFF0009)
74 : : #define PARALLEL_KEY_SESSION_DSM UINT64CONST(0xFFFFFFFFFFFF000A)
75 : : #define PARALLEL_KEY_PENDING_SYNCS UINT64CONST(0xFFFFFFFFFFFF000B)
76 : : #define PARALLEL_KEY_REINDEX_STATE UINT64CONST(0xFFFFFFFFFFFF000C)
77 : : #define PARALLEL_KEY_RELMAPPER_STATE UINT64CONST(0xFFFFFFFFFFFF000D)
78 : : #define PARALLEL_KEY_UNCOMMITTEDENUMS UINT64CONST(0xFFFFFFFFFFFF000E)
79 : : #define PARALLEL_KEY_CLIENTCONNINFO UINT64CONST(0xFFFFFFFFFFFF000F)
80 : :
81 : : /* Fixed-size parallel state. */
82 : : typedef struct FixedParallelState
83 : : {
84 : : /* Fixed-size state that workers must restore. */
85 : : Oid database_id;
86 : : Oid authenticated_user_id;
87 : : Oid session_user_id;
88 : : Oid outer_user_id;
89 : : Oid current_user_id;
90 : : Oid temp_namespace_id;
91 : : Oid temp_toast_namespace_id;
92 : : int sec_context;
93 : : bool session_user_is_superuser;
94 : : bool role_is_superuser;
95 : : PGPROC *parallel_leader_pgproc;
96 : : pid_t parallel_leader_pid;
97 : : ProcNumber parallel_leader_proc_number;
98 : : TimestampTz xact_ts;
99 : : TimestampTz stmt_ts;
100 : : SerializableXactHandle serializable_xact_handle;
101 : :
102 : : /* Mutex protects remaining fields. */
103 : : slock_t mutex;
104 : :
105 : : /* Maximum XactLastRecEnd of any worker. */
106 : : XLogRecPtr last_xlog_end;
107 : : } FixedParallelState;
108 : :
109 : : /*
110 : : * Our parallel worker number. We initialize this to -1, meaning that we are
111 : : * not a parallel worker. In parallel workers, it will be set to a value >= 0
112 : : * and < the number of workers before any user code is invoked; each parallel
113 : : * worker will get a different parallel worker number.
114 : : */
115 : : int ParallelWorkerNumber = -1;
116 : :
117 : : /* Is there a parallel message pending which we need to receive? */
118 : : volatile sig_atomic_t ParallelMessagePending = false;
119 : :
120 : : /* Are we initializing a parallel worker? */
121 : : bool InitializingParallelWorker = false;
122 : :
123 : : /* Pointer to our fixed parallel state. */
124 : : static FixedParallelState *MyFixedParallelState;
125 : :
126 : : /* List of active parallel contexts. */
127 : : static dlist_head pcxt_list = DLIST_STATIC_INIT(pcxt_list);
128 : :
129 : : /* Backend-local copy of data from FixedParallelState. */
130 : : static pid_t ParallelLeaderPid;
131 : :
132 : : /*
133 : : * List of internal parallel worker entry points. We need this for
134 : : * reasons explained in LookupParallelWorkerFunction(), below.
135 : : */
136 : : static const struct
137 : : {
138 : : const char *fn_name;
139 : : parallel_worker_main_type fn_addr;
140 : : } InternalParallelWorkers[] =
141 : :
142 : : {
143 : : {
144 : : "ParallelQueryMain", ParallelQueryMain
145 : : },
146 : : {
147 : : "_bt_parallel_build_main", _bt_parallel_build_main
148 : : },
149 : : {
150 : : "_brin_parallel_build_main", _brin_parallel_build_main
151 : : },
152 : : {
153 : : "_gin_parallel_build_main", _gin_parallel_build_main
154 : : },
155 : : {
156 : : "parallel_vacuum_main", parallel_vacuum_main
157 : : }
158 : : };
159 : :
160 : : /* Private functions. */
161 : : static void ProcessParallelMessage(ParallelContext *pcxt, int i, StringInfo msg);
162 : : static void WaitForParallelWorkersToExit(ParallelContext *pcxt);
163 : : static parallel_worker_main_type LookupParallelWorkerFunction(const char *libraryname, const char *funcname);
164 : : static void ParallelWorkerShutdown(int code, Datum arg);
165 : :
166 : :
167 : : /*
168 : : * Establish a new parallel context. This should be done after entering
169 : : * parallel mode, and (unless there is an error) the context should be
170 : : * destroyed before exiting the current subtransaction.
171 : : */
172 : : ParallelContext *
3067 tgl@sss.pgh.pa.us 173 :GIC 456 : CreateParallelContext(const char *library_name, const char *function_name,
174 : : int nworkers)
175 : : {
176 : : MemoryContext oldcontext;
177 : : ParallelContext *pcxt;
178 : :
179 : : /* It is unsafe to create a parallel context if not in parallel mode. */
3782 rhaas@postgresql.org 180 [ - + ]: 456 : Assert(IsInParallelMode());
181 : :
182 : : /* Number of workers should be non-negative. */
183 [ - + ]: 456 : Assert(nworkers >= 0);
184 : :
185 : : /* We might be running in a short-lived memory context. */
186 : 456 : oldcontext = MemoryContextSwitchTo(TopTransactionContext);
187 : :
188 : : /* Initialize a new ParallelContext. */
189 : 456 : pcxt = palloc0(sizeof(ParallelContext));
190 : 456 : pcxt->subid = GetCurrentSubTransactionId();
191 : 456 : pcxt->nworkers = nworkers;
2056 akapila@postgresql.o 192 : 456 : pcxt->nworkers_to_launch = nworkers;
3067 tgl@sss.pgh.pa.us 193 : 456 : pcxt->library_name = pstrdup(library_name);
194 : 456 : pcxt->function_name = pstrdup(function_name);
3782 rhaas@postgresql.org 195 : 456 : pcxt->error_context_stack = error_context_stack;
196 : 456 : shm_toc_initialize_estimator(&pcxt->estimator);
197 : 456 : dlist_push_head(&pcxt_list, &pcxt->node);
198 : :
199 : : /* Restore previous memory context. */
200 : 456 : MemoryContextSwitchTo(oldcontext);
201 : :
202 : 456 : return pcxt;
203 : : }
204 : :
205 : : /*
206 : : * Establish the dynamic shared memory segment for a parallel context and
207 : : * copy state and other bookkeeping information that will be needed by
208 : : * parallel workers into it.
209 : : */
210 : : void
211 : 456 : InitializeParallelDSM(ParallelContext *pcxt)
212 : : {
213 : : MemoryContext oldcontext;
3759 bruce@momjian.us 214 : 456 : Size library_len = 0;
215 : 456 : Size guc_len = 0;
216 : 456 : Size combocidlen = 0;
217 : 456 : Size tsnaplen = 0;
218 : 456 : Size asnaplen = 0;
219 : 456 : Size tstatelen = 0;
1981 noah@leadboat.com 220 : 456 : Size pendingsyncslen = 0;
2787 rhaas@postgresql.org 221 : 456 : Size reindexlen = 0;
2584 pg@bowt.ie 222 : 456 : Size relmapperlen = 0;
1705 tmunro@postgresql.or 223 : 456 : Size uncommittedenumslen = 0;
1109 michael@paquier.xyz 224 : 456 : Size clientconninfolen = 0;
3759 bruce@momjian.us 225 : 456 : Size segsize = 0;
226 : : int i;
227 : : FixedParallelState *fps;
2914 andres@anarazel.de 228 : 456 : dsm_handle session_dsm_handle = DSM_HANDLE_INVALID;
3782 rhaas@postgresql.org 229 : 456 : Snapshot transaction_snapshot = GetTransactionSnapshot();
230 : 456 : Snapshot active_snapshot = GetActiveSnapshot();
231 : :
232 : : /* We might be running in a very short-lived memory context. */
233 : 456 : oldcontext = MemoryContextSwitchTo(TopTransactionContext);
234 : :
235 : : /* Allow space to store the fixed-size parallel state. */
236 : 456 : shm_toc_estimate_chunk(&pcxt->estimator, sizeof(FixedParallelState));
237 : 456 : shm_toc_estimate_keys(&pcxt->estimator, 1);
238 : :
239 : : /*
240 : : * If we manage to reach here while non-interruptible, it's unsafe to
241 : : * launch any workers: we would fail to process interrupts sent by them.
242 : : * We can deal with that edge case by pretending no workers were
243 : : * requested.
244 : : */
302 tgl@sss.pgh.pa.us 245 [ + - + - : 456 : if (!INTERRUPTS_CAN_BE_PROCESSED())
- + ]
302 tgl@sss.pgh.pa.us 246 :UIC 0 : pcxt->nworkers = 0;
247 : :
248 : : /*
249 : : * Normally, the user will have requested at least one worker process, but
250 : : * if by chance they have not, we can skip a bunch of things here.
251 : : */
2914 andres@anarazel.de 252 [ + - ]:GIC 456 : if (pcxt->nworkers > 0)
253 : : {
254 : : /* Get (or create) the per-session DSM segment's handle. */
255 : 456 : session_dsm_handle = GetSessionDsmHandle();
256 : :
257 : : /*
258 : : * If we weren't able to create a per-session DSM segment, then we can
259 : : * continue but we can't safely launch any workers because their
260 : : * record typmods would be incompatible so they couldn't exchange
261 : : * tuples.
262 : : */
263 [ - + ]: 456 : if (session_dsm_handle == DSM_HANDLE_INVALID)
2914 andres@anarazel.de 264 :UIC 0 : pcxt->nworkers = 0;
265 : : }
266 : :
3782 rhaas@postgresql.org 267 [ + - ]:GIC 456 : if (pcxt->nworkers > 0)
268 : : {
269 : : /* Estimate space for various kinds of state sharing. */
270 : 456 : library_len = EstimateLibraryStateSpace();
271 : 456 : shm_toc_estimate_chunk(&pcxt->estimator, library_len);
272 : 456 : guc_len = EstimateGUCStateSpace();
273 : 456 : shm_toc_estimate_chunk(&pcxt->estimator, guc_len);
274 : 456 : combocidlen = EstimateComboCIDStateSpace();
275 : 456 : shm_toc_estimate_chunk(&pcxt->estimator, combocidlen);
1473 276 [ + + ]: 456 : if (IsolationUsesXactSnapshot())
277 : : {
278 : 11 : tsnaplen = EstimateSnapshotSpace(transaction_snapshot);
279 : 11 : shm_toc_estimate_chunk(&pcxt->estimator, tsnaplen);
280 : : }
3782 281 : 456 : asnaplen = EstimateSnapshotSpace(active_snapshot);
282 : 456 : shm_toc_estimate_chunk(&pcxt->estimator, asnaplen);
283 : 456 : tstatelen = EstimateTransactionStateSpace();
284 : 456 : shm_toc_estimate_chunk(&pcxt->estimator, tstatelen);
2914 andres@anarazel.de 285 : 456 : shm_toc_estimate_chunk(&pcxt->estimator, sizeof(dsm_handle));
1981 noah@leadboat.com 286 : 456 : pendingsyncslen = EstimatePendingSyncsSpace();
287 : 456 : shm_toc_estimate_chunk(&pcxt->estimator, pendingsyncslen);
2787 rhaas@postgresql.org 288 : 456 : reindexlen = EstimateReindexStateSpace();
289 : 456 : shm_toc_estimate_chunk(&pcxt->estimator, reindexlen);
2584 pg@bowt.ie 290 : 456 : relmapperlen = EstimateRelationMapSpace();
291 : 456 : shm_toc_estimate_chunk(&pcxt->estimator, relmapperlen);
1705 tmunro@postgresql.or 292 : 456 : uncommittedenumslen = EstimateUncommittedEnumsSpace();
293 : 456 : shm_toc_estimate_chunk(&pcxt->estimator, uncommittedenumslen);
1109 michael@paquier.xyz 294 : 456 : clientconninfolen = EstimateClientConnectionInfoSpace();
295 : 456 : shm_toc_estimate_chunk(&pcxt->estimator, clientconninfolen);
296 : : /* If you add more chunks here, you probably need to add keys. */
297 : 456 : shm_toc_estimate_keys(&pcxt->estimator, 12);
298 : :
299 : : /* Estimate space need for error queues. */
300 : : StaticAssertStmt(BUFFERALIGN(PARALLEL_ERROR_QUEUE_SIZE) ==
301 : : PARALLEL_ERROR_QUEUE_SIZE,
302 : : "parallel error queue size not buffer-aligned");
3782 rhaas@postgresql.org 303 : 456 : shm_toc_estimate_chunk(&pcxt->estimator,
304 : : mul_size(PARALLEL_ERROR_QUEUE_SIZE,
305 : : pcxt->nworkers));
306 : 456 : shm_toc_estimate_keys(&pcxt->estimator, 1);
307 : :
308 : : /* Estimate how much we'll need for the entrypoint info. */
3067 tgl@sss.pgh.pa.us 309 : 456 : shm_toc_estimate_chunk(&pcxt->estimator, strlen(pcxt->library_name) +
310 : : strlen(pcxt->function_name) + 2);
311 : 456 : shm_toc_estimate_keys(&pcxt->estimator, 1);
312 : : }
313 : :
314 : : /*
315 : : * Create DSM and initialize with new table of contents. But if the user
316 : : * didn't request any workers, then don't bother creating a dynamic shared
317 : : * memory segment; instead, just use backend-private memory.
318 : : *
319 : : * Also, if we can't create a dynamic shared memory segment because the
320 : : * maximum number of segments have already been created, then fall back to
321 : : * backend-private memory, and plan not to use any workers. We hope this
322 : : * won't happen very often, but it's better to abandon the use of
323 : : * parallelism than to fail outright.
324 : : */
3782 rhaas@postgresql.org 325 : 456 : segsize = shm_toc_estimate(&pcxt->estimator);
3369 tgl@sss.pgh.pa.us 326 [ + - ]: 456 : if (pcxt->nworkers > 0)
3782 rhaas@postgresql.org 327 : 456 : pcxt->seg = dsm_create(segsize, DSM_CREATE_NULL_IF_MAXSEGMENTS);
328 [ + - ]: 456 : if (pcxt->seg != NULL)
329 : 456 : pcxt->toc = shm_toc_create(PARALLEL_MAGIC,
330 : : dsm_segment_address(pcxt->seg),
331 : : segsize);
332 : : else
333 : : {
3782 rhaas@postgresql.org 334 :UIC 0 : pcxt->nworkers = 0;
3777 335 : 0 : pcxt->private_memory = MemoryContextAlloc(TopMemoryContext, segsize);
336 : 0 : pcxt->toc = shm_toc_create(PARALLEL_MAGIC, pcxt->private_memory,
337 : : segsize);
338 : : }
339 : :
340 : : /* Initialize fixed-size state in shared memory. */
341 : : fps = (FixedParallelState *)
3782 rhaas@postgresql.org 342 :GIC 456 : shm_toc_allocate(pcxt->toc, sizeof(FixedParallelState));
343 : 456 : fps->database_id = MyDatabaseId;
344 : 456 : fps->authenticated_user_id = GetAuthenticatedUserId();
299 tgl@sss.pgh.pa.us 345 : 456 : fps->session_user_id = GetSessionUserId();
2869 rhaas@postgresql.org 346 : 456 : fps->outer_user_id = GetCurrentRoleId();
3782 347 : 456 : GetUserIdAndSecContext(&fps->current_user_id, &fps->sec_context);
299 tgl@sss.pgh.pa.us 348 : 456 : fps->session_user_is_superuser = GetSessionUserIsSuperuser();
349 : 456 : fps->role_is_superuser = current_role_is_superuser;
3376 350 : 456 : GetTempNamespaceState(&fps->temp_namespace_id,
351 : : &fps->temp_toast_namespace_id);
1910 andres@anarazel.de 352 : 456 : fps->parallel_leader_pgproc = MyProc;
353 : 456 : fps->parallel_leader_pid = MyProcPid;
552 heikki.linnakangas@i 354 : 456 : fps->parallel_leader_proc_number = MyProcNumber;
2527 tgl@sss.pgh.pa.us 355 : 456 : fps->xact_ts = GetCurrentTransactionStartTimestamp();
356 : 456 : fps->stmt_ts = GetCurrentStatementStartTimestamp();
2367 tmunro@postgresql.or 357 : 456 : fps->serializable_xact_handle = ShareSerializableXact();
3782 rhaas@postgresql.org 358 : 456 : SpinLockInit(&fps->mutex);
359 : 456 : fps->last_xlog_end = 0;
360 : 456 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_FIXED, fps);
361 : :
362 : : /* We can skip the rest of this if we're not budgeting for any workers. */
363 [ + - ]: 456 : if (pcxt->nworkers > 0)
364 : : {
365 : : char *libraryspace;
366 : : char *gucspace;
367 : : char *combocidspace;
368 : : char *tsnapspace;
369 : : char *asnapspace;
370 : : char *tstatespace;
371 : : char *pendingsyncsspace;
372 : : char *reindexspace;
373 : : char *relmapperspace;
374 : : char *error_queue_space;
375 : : char *session_dsm_handle_space;
376 : : char *entrypointstate;
377 : : char *uncommittedenumsspace;
378 : : char *clientconninfospace;
379 : : Size lnamelen;
380 : :
381 : : /* Serialize shared libraries we have loaded. */
382 : 456 : libraryspace = shm_toc_allocate(pcxt->toc, library_len);
383 : 456 : SerializeLibraryState(library_len, libraryspace);
384 : 456 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_LIBRARY, libraryspace);
385 : :
386 : : /* Serialize GUC settings. */
387 : 456 : gucspace = shm_toc_allocate(pcxt->toc, guc_len);
388 : 456 : SerializeGUCState(guc_len, gucspace);
389 : 456 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_GUC, gucspace);
390 : :
391 : : /* Serialize combo CID state. */
392 : 456 : combocidspace = shm_toc_allocate(pcxt->toc, combocidlen);
393 : 456 : SerializeComboCIDState(combocidlen, combocidspace);
394 : 456 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_COMBO_CID, combocidspace);
395 : :
396 : : /*
397 : : * Serialize the transaction snapshot if the transaction isolation
398 : : * level uses a transaction snapshot.
399 : : */
1473 400 [ + + ]: 456 : if (IsolationUsesXactSnapshot())
401 : : {
402 : 11 : tsnapspace = shm_toc_allocate(pcxt->toc, tsnaplen);
403 : 11 : SerializeSnapshot(transaction_snapshot, tsnapspace);
404 : 11 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_TRANSACTION_SNAPSHOT,
405 : : tsnapspace);
406 : : }
407 : :
408 : : /* Serialize the active snapshot. */
3782 409 : 456 : asnapspace = shm_toc_allocate(pcxt->toc, asnaplen);
410 : 456 : SerializeSnapshot(active_snapshot, asnapspace);
411 : 456 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_ACTIVE_SNAPSHOT, asnapspace);
412 : :
413 : : /* Provide the handle for per-session segment. */
2914 andres@anarazel.de 414 : 456 : session_dsm_handle_space = shm_toc_allocate(pcxt->toc,
415 : : sizeof(dsm_handle));
416 : 456 : *(dsm_handle *) session_dsm_handle_space = session_dsm_handle;
417 : 456 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_SESSION_DSM,
418 : : session_dsm_handle_space);
419 : :
420 : : /* Serialize transaction state. */
3782 rhaas@postgresql.org 421 : 456 : tstatespace = shm_toc_allocate(pcxt->toc, tstatelen);
422 : 456 : SerializeTransactionState(tstatelen, tstatespace);
423 : 456 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_TRANSACTION_STATE, tstatespace);
424 : :
425 : : /* Serialize pending syncs. */
1981 noah@leadboat.com 426 : 456 : pendingsyncsspace = shm_toc_allocate(pcxt->toc, pendingsyncslen);
427 : 456 : SerializePendingSyncs(pendingsyncslen, pendingsyncsspace);
428 : 456 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_PENDING_SYNCS,
429 : : pendingsyncsspace);
430 : :
431 : : /* Serialize reindex state. */
2787 rhaas@postgresql.org 432 : 456 : reindexspace = shm_toc_allocate(pcxt->toc, reindexlen);
433 : 456 : SerializeReindexState(reindexlen, reindexspace);
434 : 456 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_REINDEX_STATE, reindexspace);
435 : :
436 : : /* Serialize relmapper state. */
2584 pg@bowt.ie 437 : 456 : relmapperspace = shm_toc_allocate(pcxt->toc, relmapperlen);
438 : 456 : SerializeRelationMap(relmapperlen, relmapperspace);
439 : 456 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_RELMAPPER_STATE,
440 : : relmapperspace);
441 : :
442 : : /* Serialize uncommitted enum state. */
1705 tmunro@postgresql.or 443 : 456 : uncommittedenumsspace = shm_toc_allocate(pcxt->toc,
444 : : uncommittedenumslen);
445 : 456 : SerializeUncommittedEnums(uncommittedenumsspace, uncommittedenumslen);
446 : 456 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_UNCOMMITTEDENUMS,
447 : : uncommittedenumsspace);
448 : :
449 : : /* Serialize our ClientConnectionInfo. */
1109 michael@paquier.xyz 450 : 456 : clientconninfospace = shm_toc_allocate(pcxt->toc, clientconninfolen);
451 : 456 : SerializeClientConnectionInfo(clientconninfolen, clientconninfospace);
452 : 456 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_CLIENTCONNINFO,
453 : : clientconninfospace);
454 : :
455 : : /* Allocate space for worker information. */
3782 rhaas@postgresql.org 456 : 456 : pcxt->worker = palloc0(sizeof(ParallelWorkerInfo) * pcxt->nworkers);
457 : :
458 : : /*
459 : : * Establish error queues in dynamic shared memory.
460 : : *
461 : : * These queues should be used only for transmitting ErrorResponse,
462 : : * NoticeResponse, and NotifyResponse protocol messages. Tuple data
463 : : * should be transmitted via separate (possibly larger?) queues.
464 : : */
465 : : error_queue_space =
3759 bruce@momjian.us 466 : 456 : shm_toc_allocate(pcxt->toc,
467 : : mul_size(PARALLEL_ERROR_QUEUE_SIZE,
3410 rhaas@postgresql.org 468 : 456 : pcxt->nworkers));
3782 469 [ + + ]: 1458 : for (i = 0; i < pcxt->nworkers; ++i)
470 : : {
471 : : char *start;
472 : : shm_mq *mq;
473 : :
474 : 1002 : start = error_queue_space + i * PARALLEL_ERROR_QUEUE_SIZE;
475 : 1002 : mq = shm_mq_create(start, PARALLEL_ERROR_QUEUE_SIZE);
476 : 1002 : shm_mq_set_receiver(mq, MyProc);
477 : 1002 : pcxt->worker[i].error_mqh = shm_mq_attach(mq, pcxt->seg, NULL);
478 : : }
479 : 456 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_ERROR_QUEUE, error_queue_space);
480 : :
481 : : /*
482 : : * Serialize entrypoint information. It's unsafe to pass function
483 : : * pointers across processes, as the function pointer may be different
484 : : * in each process in EXEC_BACKEND builds, so we always pass library
485 : : * and function name. (We use library name "postgres" for functions
486 : : * in the core backend.)
487 : : */
3067 tgl@sss.pgh.pa.us 488 : 456 : lnamelen = strlen(pcxt->library_name);
489 : 456 : entrypointstate = shm_toc_allocate(pcxt->toc, lnamelen +
490 : 456 : strlen(pcxt->function_name) + 2);
491 : 456 : strcpy(entrypointstate, pcxt->library_name);
492 : 456 : strcpy(entrypointstate + lnamelen + 1, pcxt->function_name);
493 : 456 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_ENTRYPOINT, entrypointstate);
494 : : }
495 : :
496 : : /* Update nworkers_to_launch, in case we changed nworkers above. */
302 497 : 456 : pcxt->nworkers_to_launch = pcxt->nworkers;
498 : :
499 : : /* Restore previous memory context. */
3782 rhaas@postgresql.org 500 : 456 : MemoryContextSwitchTo(oldcontext);
501 : 456 : }
502 : :
503 : : /*
504 : : * Reinitialize the dynamic shared memory segment for a parallel context such
505 : : * that we could launch workers for it again.
506 : : */
507 : : void
3599 508 : 137 : ReinitializeParallelDSM(ParallelContext *pcxt)
509 : : {
510 : : FixedParallelState *fps;
511 : :
512 : : /* Wait for any old workers to exit. */
3369 tgl@sss.pgh.pa.us 513 [ + - ]: 137 : if (pcxt->nworkers_launched > 0)
514 : : {
515 : 137 : WaitForParallelWorkersToFinish(pcxt);
516 : 137 : WaitForParallelWorkersToExit(pcxt);
517 : 137 : pcxt->nworkers_launched = 0;
2773 rhaas@postgresql.org 518 [ + - ]: 137 : if (pcxt->known_attached_workers)
519 : : {
520 : 137 : pfree(pcxt->known_attached_workers);
521 : 137 : pcxt->known_attached_workers = NULL;
522 : 137 : pcxt->nknown_attached_workers = 0;
523 : : }
524 : : }
525 : :
526 : : /* Reset a few bits of fixed parallel state to a clean state. */
3015 tgl@sss.pgh.pa.us 527 : 137 : fps = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_FIXED, false);
3599 rhaas@postgresql.org 528 : 137 : fps->last_xlog_end = 0;
529 : :
530 : : /* Recreate error queues (if they exist). */
2773 tgl@sss.pgh.pa.us 531 [ + - ]: 137 : if (pcxt->nworkers > 0)
532 : : {
533 : : char *error_queue_space;
534 : : int i;
535 : :
536 : : error_queue_space =
537 : 137 : shm_toc_lookup(pcxt->toc, PARALLEL_KEY_ERROR_QUEUE, false);
538 [ + + ]: 556 : for (i = 0; i < pcxt->nworkers; ++i)
539 : : {
540 : : char *start;
541 : : shm_mq *mq;
542 : :
543 : 419 : start = error_queue_space + i * PARALLEL_ERROR_QUEUE_SIZE;
544 : 419 : mq = shm_mq_create(start, PARALLEL_ERROR_QUEUE_SIZE);
545 : 419 : shm_mq_set_receiver(mq, MyProc);
546 : 419 : pcxt->worker[i].error_mqh = shm_mq_attach(mq, pcxt->seg, NULL);
547 : : }
548 : : }
3599 rhaas@postgresql.org 549 : 137 : }
550 : :
551 : : /*
552 : : * Reinitialize parallel workers for a parallel context such that we could
553 : : * launch a different number of workers. This is required for cases where
554 : : * we need to reuse the same DSM segment, but the number of workers can
555 : : * vary from run-to-run.
556 : : */
557 : : void
2056 akapila@postgresql.o 558 : 25 : ReinitializeParallelWorkers(ParallelContext *pcxt, int nworkers_to_launch)
559 : : {
560 : : /*
561 : : * The number of workers that need to be launched must be less than the
562 : : * number of workers with which the parallel context is initialized. But
563 : : * the caller might not know that InitializeParallelDSM reduced nworkers,
564 : : * so just silently trim the request.
565 : : */
302 tgl@sss.pgh.pa.us 566 : 25 : pcxt->nworkers_to_launch = Min(pcxt->nworkers, nworkers_to_launch);
2056 akapila@postgresql.o 567 : 25 : }
568 : :
569 : : /*
570 : : * Launch parallel workers.
571 : : */
572 : : void
3782 rhaas@postgresql.org 573 : 593 : LaunchParallelWorkers(ParallelContext *pcxt)
574 : : {
575 : : MemoryContext oldcontext;
576 : : BackgroundWorker worker;
577 : : int i;
3759 bruce@momjian.us 578 : 593 : bool any_registrations_failed = false;
579 : :
580 : : /* Skip this if we have no workers. */
2056 akapila@postgresql.o 581 [ + - - + ]: 593 : if (pcxt->nworkers == 0 || pcxt->nworkers_to_launch == 0)
3782 rhaas@postgresql.org 582 :UIC 0 : return;
583 : :
584 : : /* We need to be a lock group leader. */
3499 rhaas@postgresql.org 585 :GIC 593 : BecomeLockGroupLeader();
586 : :
587 : : /* If we do have workers, we'd better have a DSM segment. */
3782 588 [ - + ]: 593 : Assert(pcxt->seg != NULL);
589 : :
590 : : /* We might be running in a short-lived memory context. */
591 : 593 : oldcontext = MemoryContextSwitchTo(TopTransactionContext);
592 : :
593 : : /* Configure a worker. */
3065 tgl@sss.pgh.pa.us 594 : 593 : memset(&worker, 0, sizeof(worker));
3782 rhaas@postgresql.org 595 : 593 : snprintf(worker.bgw_name, BGW_MAXLEN, "parallel worker for PID %d",
596 : : MyProcPid);
2928 peter_e@gmx.net 597 : 593 : snprintf(worker.bgw_type, BGW_MAXLEN, "parallel worker");
3782 rhaas@postgresql.org 598 : 593 : worker.bgw_flags =
599 : : BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION
600 : : | BGWORKER_CLASS_PARALLEL;
601 : 593 : worker.bgw_start_time = BgWorkerStart_ConsistentState;
602 : 593 : worker.bgw_restart_time = BGW_NEVER_RESTART;
3081 603 : 593 : sprintf(worker.bgw_library_name, "postgres");
604 : 593 : sprintf(worker.bgw_function_name, "ParallelWorkerMain");
3782 605 : 593 : worker.bgw_main_arg = UInt32GetDatum(dsm_segment_handle(pcxt->seg));
606 : 593 : worker.bgw_notify_pid = MyProcPid;
607 : :
608 : : /*
609 : : * Start workers.
610 : : *
611 : : * The caller must be able to tolerate ending up with fewer workers than
612 : : * expected, so there is no need to throw an error here if registration
613 : : * fails. It wouldn't help much anyway, because registering the worker in
614 : : * no way guarantees that it will start up and initialize successfully.
615 : : */
2056 akapila@postgresql.o 616 [ + + ]: 2014 : for (i = 0; i < pcxt->nworkers_to_launch; ++i)
617 : : {
3593 rhaas@postgresql.org 618 : 1421 : memcpy(worker.bgw_extra, &i, sizeof(int));
3782 619 [ + + + + ]: 2813 : if (!any_registrations_failed &&
620 : 1392 : RegisterDynamicBackgroundWorker(&worker,
621 : 1392 : &pcxt->worker[i].bgwhandle))
622 : : {
623 : 1378 : shm_mq_set_handle(pcxt->worker[i].error_mqh,
624 : 1378 : pcxt->worker[i].bgwhandle);
3613 625 : 1378 : pcxt->nworkers_launched++;
626 : : }
627 : : else
628 : : {
629 : : /*
630 : : * If we weren't able to register the worker, then we've bumped up
631 : : * against the max_worker_processes limit, and future
632 : : * registrations will probably fail too, so arrange to skip them.
633 : : * But we still have to execute this code for the remaining slots
634 : : * to make sure that we forget about the error queues we budgeted
635 : : * for those workers. Otherwise, we'll wait for them to start,
636 : : * but they never will.
637 : : */
3782 638 : 43 : any_registrations_failed = true;
639 : 43 : pcxt->worker[i].bgwhandle = NULL;
2928 tgl@sss.pgh.pa.us 640 : 43 : shm_mq_detach(pcxt->worker[i].error_mqh);
3782 rhaas@postgresql.org 641 : 43 : pcxt->worker[i].error_mqh = NULL;
642 : : }
643 : : }
644 : :
645 : : /*
646 : : * Now that nworkers_launched has taken its final value, we can initialize
647 : : * known_attached_workers.
648 : : */
2783 649 [ + + ]: 593 : if (pcxt->nworkers_launched > 0)
650 : : {
2773 651 : 583 : pcxt->known_attached_workers =
2783 652 : 583 : palloc0(sizeof(bool) * pcxt->nworkers_launched);
2773 653 : 583 : pcxt->nknown_attached_workers = 0;
654 : : }
655 : :
656 : : /* Restore previous memory context. */
3782 657 : 593 : MemoryContextSwitchTo(oldcontext);
658 : : }
659 : :
660 : : /*
661 : : * Wait for all workers to attach to their error queues, and throw an error if
662 : : * any worker fails to do this.
663 : : *
664 : : * Callers can assume that if this function returns successfully, then the
665 : : * number of workers given by pcxt->nworkers_launched have initialized and
666 : : * attached to their error queues. Whether or not these workers are guaranteed
667 : : * to still be running depends on what code the caller asked them to run;
668 : : * this function does not guarantee that they have not exited. However, it
669 : : * does guarantee that any workers which exited must have done so cleanly and
670 : : * after successfully performing the work with which they were tasked.
671 : : *
672 : : * If this function is not called, then some of the workers that were launched
673 : : * may not have been started due to a fork() failure, or may have exited during
674 : : * early startup prior to attaching to the error queue, so nworkers_launched
675 : : * cannot be viewed as completely reliable. It will never be less than the
676 : : * number of workers which actually started, but it might be more. Any workers
677 : : * that failed to start will still be discovered by
678 : : * WaitForParallelWorkersToFinish and an error will be thrown at that time,
679 : : * provided that function is eventually reached.
680 : : *
681 : : * In general, the leader process should do as much work as possible before
682 : : * calling this function. fork() failures and other early-startup failures
683 : : * are very uncommon, and having the leader sit idle when it could be doing
684 : : * useful work is undesirable. However, if the leader needs to wait for
685 : : * all of its workers or for a specific worker, it may want to call this
686 : : * function before doing so. If not, it must make some other provision for
687 : : * the failure-to-start case, lest it wait forever. On the other hand, a
688 : : * leader which never waits for a worker that might not be started yet, or
689 : : * at least never does so prior to WaitForParallelWorkersToFinish(), need not
690 : : * call this function at all.
691 : : */
692 : : void
2773 693 : 78 : WaitForParallelWorkersToAttach(ParallelContext *pcxt)
694 : : {
695 : : int i;
696 : :
697 : : /* Skip this if we have no launched workers. */
698 [ - + ]: 78 : if (pcxt->nworkers_launched == 0)
2773 rhaas@postgresql.org 699 :UIC 0 : return;
700 : :
701 : : for (;;)
702 : : {
703 : : /*
704 : : * This will process any parallel messages that are pending and it may
705 : : * also throw an error propagated from a worker.
706 : : */
2773 rhaas@postgresql.org 707 [ + + ]:GIC 1713193 : CHECK_FOR_INTERRUPTS();
708 : :
709 [ + + ]: 3426388 : for (i = 0; i < pcxt->nworkers_launched; ++i)
710 : : {
711 : : BgwHandleStatus status;
712 : : shm_mq *mq;
713 : : int rc;
714 : : pid_t pid;
715 : :
716 [ + + ]: 1713195 : if (pcxt->known_attached_workers[i])
717 : 12 : continue;
718 : :
719 : : /*
720 : : * If error_mqh is NULL, then the worker has already exited
721 : : * cleanly.
722 : : */
723 [ - + ]: 1713183 : if (pcxt->worker[i].error_mqh == NULL)
724 : : {
2773 rhaas@postgresql.org 725 :UIC 0 : pcxt->known_attached_workers[i] = true;
726 : 0 : ++pcxt->nknown_attached_workers;
727 : 0 : continue;
728 : : }
729 : :
2773 rhaas@postgresql.org 730 :GIC 1713183 : status = GetBackgroundWorkerPid(pcxt->worker[i].bgwhandle, &pid);
731 [ + + ]: 1713183 : if (status == BGWH_STARTED)
732 : : {
733 : : /* Has the worker attached to the error queue? */
734 : 1713139 : mq = shm_mq_get_queue(pcxt->worker[i].error_mqh);
735 [ + + ]: 1713139 : if (shm_mq_get_sender(mq) != NULL)
736 : : {
737 : : /* Yes, so it is known to be attached. */
738 : 68 : pcxt->known_attached_workers[i] = true;
739 : 68 : ++pcxt->nknown_attached_workers;
740 : : }
741 : : }
742 [ - + ]: 44 : else if (status == BGWH_STOPPED)
743 : : {
744 : : /*
745 : : * If the worker stopped without attaching to the error queue,
746 : : * throw an error.
747 : : */
2773 rhaas@postgresql.org 748 :UIC 0 : mq = shm_mq_get_queue(pcxt->worker[i].error_mqh);
749 [ # # ]: 0 : if (shm_mq_get_sender(mq) == NULL)
750 [ # # ]: 0 : ereport(ERROR,
751 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
752 : : errmsg("parallel worker failed to initialize"),
753 : : errhint("More details may be available in the server log.")));
754 : :
755 : 0 : pcxt->known_attached_workers[i] = true;
756 : 0 : ++pcxt->nknown_attached_workers;
757 : : }
758 : : else
759 : : {
760 : : /*
761 : : * Worker not yet started, so we must wait. The postmaster
762 : : * will notify us if the worker's state changes. Our latch
763 : : * might also get set for some other reason, but if so we'll
764 : : * just end up waiting for the same worker again.
765 : : */
2773 rhaas@postgresql.org 766 :GIC 44 : rc = WaitLatch(MyLatch,
767 : : WL_LATCH_SET | WL_EXIT_ON_PM_DEATH,
768 : : -1, WAIT_EVENT_BGWORKER_STARTUP);
769 : :
770 [ + - ]: 44 : if (rc & WL_LATCH_SET)
771 : 44 : ResetLatch(MyLatch);
772 : : }
773 : : }
774 : :
775 : : /* If all workers are known to have started, we're done. */
776 [ + + ]: 1713193 : if (pcxt->nknown_attached_workers >= pcxt->nworkers_launched)
777 : : {
778 [ - + ]: 78 : Assert(pcxt->nknown_attached_workers == pcxt->nworkers_launched);
779 : 78 : break;
780 : : }
781 : : }
782 : : }
783 : :
784 : : /*
785 : : * Wait for all workers to finish computing.
786 : : *
787 : : * Even if the parallel operation seems to have completed successfully, it's
788 : : * important to call this function afterwards. We must not miss any errors
789 : : * the workers may have thrown during the parallel operation, or any that they
790 : : * may yet throw while shutting down.
791 : : *
792 : : * Also, we want to update our notion of XactLastRecEnd based on worker
793 : : * feedback.
794 : : */
795 : : void
3782 796 : 724 : WaitForParallelWorkersToFinish(ParallelContext *pcxt)
797 : : {
798 : : for (;;)
799 : 397 : {
3759 bruce@momjian.us 800 : 1121 : bool anyone_alive = false;
2783 rhaas@postgresql.org 801 : 1121 : int nfinished = 0;
802 : : int i;
803 : :
804 : : /*
805 : : * This will process any parallel messages that are pending, which may
806 : : * change the outcome of the loop that follows. It may also throw an
807 : : * error propagated from a worker.
808 : : */
3782 809 [ + + ]: 1121 : CHECK_FOR_INTERRUPTS();
810 : :
3473 811 [ + + ]: 3741 : for (i = 0; i < pcxt->nworkers_launched; ++i)
812 : : {
813 : : /*
814 : : * If error_mqh is NULL, then the worker has already exited
815 : : * cleanly. If we have received a message through error_mqh from
816 : : * the worker, we know it started up cleanly, and therefore we're
817 : : * certain to be notified when it exits.
818 : : */
2783 819 [ + + ]: 2646 : if (pcxt->worker[i].error_mqh == NULL)
820 : 2196 : ++nfinished;
2773 821 [ + + ]: 450 : else if (pcxt->known_attached_workers[i])
822 : : {
3782 823 : 26 : anyone_alive = true;
824 : 26 : break;
825 : : }
826 : : }
827 : :
828 [ + + ]: 1121 : if (!anyone_alive)
829 : : {
830 : : /* If all workers are known to have finished, we're done. */
2783 831 [ + + ]: 1095 : if (nfinished >= pcxt->nworkers_launched)
832 : : {
833 [ - + ]: 724 : Assert(nfinished == pcxt->nworkers_launched);
834 : 724 : break;
835 : : }
836 : :
837 : : /*
838 : : * We didn't detect any living workers, but not all workers are
839 : : * known to have exited cleanly. Either not all workers have
840 : : * launched yet, or maybe some of them failed to start or
841 : : * terminated abnormally.
842 : : */
843 [ + + ]: 1203 : for (i = 0; i < pcxt->nworkers_launched; ++i)
844 : : {
845 : : pid_t pid;
846 : : shm_mq *mq;
847 : :
848 : : /*
849 : : * If the worker is BGWH_NOT_YET_STARTED or BGWH_STARTED, we
850 : : * should just keep waiting. If it is BGWH_STOPPED, then
851 : : * further investigation is needed.
852 : : */
853 [ + + ]: 832 : if (pcxt->worker[i].error_mqh == NULL ||
854 [ + - + - ]: 848 : pcxt->worker[i].bgwhandle == NULL ||
855 : 424 : GetBackgroundWorkerPid(pcxt->worker[i].bgwhandle,
856 : : &pid) != BGWH_STOPPED)
857 : 832 : continue;
858 : :
859 : : /*
860 : : * Check whether the worker ended up stopped without ever
861 : : * attaching to the error queue. If so, the postmaster was
862 : : * unable to fork the worker or it exited without initializing
863 : : * properly. We must throw an error, since the caller may
864 : : * have been expecting the worker to do some work before
865 : : * exiting.
866 : : */
2783 rhaas@postgresql.org 867 :UIC 0 : mq = shm_mq_get_queue(pcxt->worker[i].error_mqh);
868 [ # # ]: 0 : if (shm_mq_get_sender(mq) == NULL)
869 [ # # ]: 0 : ereport(ERROR,
870 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
871 : : errmsg("parallel worker failed to initialize"),
872 : : errhint("More details may be available in the server log.")));
873 : :
874 : : /*
875 : : * The worker is stopped, but is attached to the error queue.
876 : : * Unless there's a bug somewhere, this will only happen when
877 : : * the worker writes messages and terminates after the
878 : : * CHECK_FOR_INTERRUPTS() near the top of this function and
879 : : * before the call to GetBackgroundWorkerPid(). In that case,
880 : : * or latch should have been set as well and the right things
881 : : * will happen on the next pass through the loop.
882 : : */
883 : : }
884 : : }
885 : :
2479 tmunro@postgresql.or 886 :GIC 397 : (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, -1,
887 : : WAIT_EVENT_PARALLEL_FINISH);
3014 andres@anarazel.de 888 : 397 : ResetLatch(MyLatch);
889 : : }
890 : :
3782 rhaas@postgresql.org 891 [ + - ]: 724 : if (pcxt->toc != NULL)
892 : : {
893 : : FixedParallelState *fps;
894 : :
3015 tgl@sss.pgh.pa.us 895 : 724 : fps = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_FIXED, false);
3782 rhaas@postgresql.org 896 [ + + ]: 724 : if (fps->last_xlog_end > XactLastRecEnd)
897 : 19 : XactLastRecEnd = fps->last_xlog_end;
898 : : }
899 : 724 : }
900 : :
901 : : /*
902 : : * Wait for all workers to exit.
903 : : *
904 : : * This function ensures that workers have been completely shutdown. The
905 : : * difference between WaitForParallelWorkersToFinish and this function is
906 : : * that the former just ensures that last message sent by a worker backend is
907 : : * received by the leader backend whereas this ensures the complete shutdown.
908 : : */
909 : : static void
3599 910 : 593 : WaitForParallelWorkersToExit(ParallelContext *pcxt)
911 : : {
912 : : int i;
913 : :
914 : : /* Wait until the workers actually die. */
3473 915 [ + + ]: 1971 : for (i = 0; i < pcxt->nworkers_launched; ++i)
916 : : {
917 : : BgwHandleStatus status;
918 : :
3599 919 [ + - - + ]: 1378 : if (pcxt->worker == NULL || pcxt->worker[i].bgwhandle == NULL)
3599 rhaas@postgresql.org 920 :UIC 0 : continue;
921 : :
3599 rhaas@postgresql.org 922 :GIC 1378 : status = WaitForBackgroundWorkerShutdown(pcxt->worker[i].bgwhandle);
923 : :
924 : : /*
925 : : * If the postmaster kicked the bucket, we have no chance of cleaning
926 : : * up safely -- we won't be able to tell when our workers are actually
927 : : * dead. This doesn't necessitate a PANIC since they will all abort
928 : : * eventually, but we can't safely continue this session.
929 : : */
930 [ - + ]: 1378 : if (status == BGWH_POSTMASTER_DIED)
3599 rhaas@postgresql.org 931 [ # # ]:UIC 0 : ereport(FATAL,
932 : : (errcode(ERRCODE_ADMIN_SHUTDOWN),
933 : : errmsg("postmaster exited during a parallel transaction")));
934 : :
935 : : /* Release memory. */
3599 rhaas@postgresql.org 936 :GIC 1378 : pfree(pcxt->worker[i].bgwhandle);
937 : 1378 : pcxt->worker[i].bgwhandle = NULL;
938 : : }
939 : 593 : }
940 : :
941 : : /*
942 : : * Destroy a parallel context.
943 : : *
944 : : * If expecting a clean exit, you should use WaitForParallelWorkersToFinish()
945 : : * first, before calling this function. When this function is invoked, any
946 : : * remaining workers are forcibly killed; the dynamic shared memory segment
947 : : * is unmapped; and we then wait (uninterruptibly) for the workers to exit.
948 : : */
949 : : void
3782 950 : 456 : DestroyParallelContext(ParallelContext *pcxt)
951 : : {
952 : : int i;
953 : :
954 : : /*
955 : : * Be careful about order of operations here! We remove the parallel
956 : : * context from the list before we do anything else; otherwise, if an
957 : : * error occurs during a subsequent step, we might try to nuke it again
958 : : * from AtEOXact_Parallel or AtEOSubXact_Parallel.
959 : : */
960 : 456 : dlist_delete(&pcxt->node);
961 : :
962 : : /* Kill each worker in turn, and forget their error queues. */
3629 963 [ + - ]: 456 : if (pcxt->worker != NULL)
964 : : {
3473 965 [ + + ]: 1418 : for (i = 0; i < pcxt->nworkers_launched; ++i)
966 : : {
3629 967 [ + + ]: 962 : if (pcxt->worker[i].error_mqh != NULL)
968 : : {
3599 969 : 6 : TerminateBackgroundWorker(pcxt->worker[i].bgwhandle);
970 : :
2928 tgl@sss.pgh.pa.us 971 : 6 : shm_mq_detach(pcxt->worker[i].error_mqh);
3629 rhaas@postgresql.org 972 : 6 : pcxt->worker[i].error_mqh = NULL;
973 : : }
974 : : }
975 : : }
976 : :
977 : : /*
978 : : * If we have allocated a shared memory segment, detach it. This will
979 : : * implicitly detach the error queues, and any other shared memory queues,
980 : : * stored there.
981 : : */
3782 982 [ + - ]: 456 : if (pcxt->seg != NULL)
983 : : {
984 : 456 : dsm_detach(pcxt->seg);
985 : 456 : pcxt->seg = NULL;
986 : : }
987 : :
988 : : /*
989 : : * If this parallel context is actually in backend-private memory rather
990 : : * than shared memory, free that memory instead.
991 : : */
3777 992 [ - + ]: 456 : if (pcxt->private_memory != NULL)
993 : : {
3777 rhaas@postgresql.org 994 :UIC 0 : pfree(pcxt->private_memory);
995 : 0 : pcxt->private_memory = NULL;
996 : : }
997 : :
998 : : /*
999 : : * We can't finish transaction commit or abort until all of the workers
1000 : : * have exited. This means, in particular, that we can't respond to
1001 : : * interrupts at this stage.
1002 : : */
3599 rhaas@postgresql.org 1003 :GIC 456 : HOLD_INTERRUPTS();
1004 : 456 : WaitForParallelWorkersToExit(pcxt);
1005 [ - + ]: 456 : RESUME_INTERRUPTS();
1006 : :
1007 : : /* Free the worker array itself. */
3782 1008 [ + - ]: 456 : if (pcxt->worker != NULL)
1009 : : {
1010 : 456 : pfree(pcxt->worker);
1011 : 456 : pcxt->worker = NULL;
1012 : : }
1013 : :
1014 : : /* Free memory. */
3067 tgl@sss.pgh.pa.us 1015 : 456 : pfree(pcxt->library_name);
1016 : 456 : pfree(pcxt->function_name);
3782 rhaas@postgresql.org 1017 : 456 : pfree(pcxt);
1018 : 456 : }
1019 : :
1020 : : /*
1021 : : * Are there any parallel contexts currently active?
1022 : : */
1023 : : bool
1024 : 314720 : ParallelContextActive(void)
1025 : : {
1026 : 314720 : return !dlist_is_empty(&pcxt_list);
1027 : : }
1028 : :
1029 : : /*
1030 : : * Handle receipt of an interrupt indicating a parallel worker message.
1031 : : *
1032 : : * Note: this is called within a signal handler! All we can do is set
1033 : : * a flag that will cause the next CHECK_FOR_INTERRUPTS() to invoke
1034 : : * ProcessParallelMessages().
1035 : : */
1036 : : void
1037 : 1236 : HandleParallelMessageInterrupt(void)
1038 : : {
1039 : 1236 : InterruptPending = true;
1040 : 1236 : ParallelMessagePending = true;
1041 : 1236 : SetLatch(MyLatch);
1042 : 1236 : }
1043 : :
1044 : : /*
1045 : : * Process any queued protocol messages received from parallel workers.
1046 : : */
1047 : : void
185 heikki.linnakangas@i 1048 : 1221 : ProcessParallelMessages(void)
1049 : : {
1050 : : dlist_iter iter;
1051 : : MemoryContext oldcontext;
1052 : :
1053 : : static MemoryContext hpm_context = NULL;
1054 : :
1055 : : /*
1056 : : * This is invoked from ProcessInterrupts(), and since some of the
1057 : : * functions it calls contain CHECK_FOR_INTERRUPTS(), there is a potential
1058 : : * for recursive calls if more signals are received while this runs. It's
1059 : : * unclear that recursive entry would be safe, and it doesn't seem useful
1060 : : * even if it is safe, so let's block interrupts until done.
1061 : : */
3322 tgl@sss.pgh.pa.us 1062 : 1221 : HOLD_INTERRUPTS();
1063 : :
1064 : : /*
1065 : : * Moreover, CurrentMemoryContext might be pointing almost anywhere. We
1066 : : * don't want to risk leaking data into long-lived contexts, so let's do
1067 : : * our work here in a private context that we can reset on each use.
1068 : : */
3298 1069 [ + + ]: 1221 : if (hpm_context == NULL) /* first time through? */
1070 : 70 : hpm_context = AllocSetContextCreate(TopMemoryContext,
1071 : : "ProcessParallelMessages",
1072 : : ALLOCSET_DEFAULT_SIZES);
1073 : : else
1074 : 1151 : MemoryContextReset(hpm_context);
1075 : :
1076 : 1221 : oldcontext = MemoryContextSwitchTo(hpm_context);
1077 : :
1078 : : /* OK to process messages. Reset the flag saying there are more to do. */
3782 rhaas@postgresql.org 1079 : 1221 : ParallelMessagePending = false;
1080 : :
1081 [ + - + + ]: 2490 : dlist_foreach(iter, &pcxt_list)
1082 : : {
1083 : : ParallelContext *pcxt;
1084 : : int i;
1085 : :
1086 : 1275 : pcxt = dlist_container(ParallelContext, node, iter.cur);
1087 [ - + ]: 1275 : if (pcxt->worker == NULL)
3782 rhaas@postgresql.org 1088 :UIC 0 : continue;
1089 : :
3473 rhaas@postgresql.org 1090 [ + + ]:GIC 4926 : for (i = 0; i < pcxt->nworkers_launched; ++i)
1091 : : {
1092 : : /*
1093 : : * Read as many messages as we can from each worker, but stop when
1094 : : * either (1) the worker's error queue goes away, which can happen
1095 : : * if we receive a Terminate message from the worker; or (2) no
1096 : : * more messages can be read from the worker without blocking.
1097 : : */
3782 1098 [ + + ]: 5029 : while (pcxt->worker[i].error_mqh != NULL)
1099 : : {
1100 : : shm_mq_result res;
1101 : : Size nbytes;
1102 : : void *data;
1103 : :
1104 : 2289 : res = shm_mq_receive(pcxt->worker[i].error_mqh, &nbytes,
1105 : : &data, true);
1106 [ + + ]: 2289 : if (res == SHM_MQ_WOULD_BLOCK)
1107 : 911 : break;
1108 [ + - ]: 1378 : else if (res == SHM_MQ_SUCCESS)
1109 : : {
1110 : : StringInfoData msg;
1111 : :
1112 : 1378 : initStringInfo(&msg);
1113 : 1378 : appendBinaryStringInfo(&msg, data, nbytes);
185 heikki.linnakangas@i 1114 : 1378 : ProcessParallelMessage(pcxt, i, &msg);
3782 rhaas@postgresql.org 1115 : 1372 : pfree(msg.data);
1116 : : }
1117 : : else
3782 rhaas@postgresql.org 1118 [ # # ]:UIC 0 : ereport(ERROR,
1119 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1120 : : errmsg("lost connection to parallel worker")));
1121 : : }
1122 : : }
1123 : : }
1124 : :
3298 tgl@sss.pgh.pa.us 1125 :GIC 1215 : MemoryContextSwitchTo(oldcontext);
1126 : :
1127 : : /* Might as well clear the context on our way out */
1128 : 1215 : MemoryContextReset(hpm_context);
1129 : :
3322 1130 [ - + ]: 1215 : RESUME_INTERRUPTS();
3782 rhaas@postgresql.org 1131 : 1215 : }
1132 : :
1133 : : /*
1134 : : * Process a single protocol message received from a single parallel worker.
1135 : : */
1136 : : static void
185 heikki.linnakangas@i 1137 : 1378 : ProcessParallelMessage(ParallelContext *pcxt, int i, StringInfo msg)
1138 : : {
1139 : : char msgtype;
1140 : :
2773 rhaas@postgresql.org 1141 [ + - ]: 1378 : if (pcxt->known_attached_workers != NULL &&
1142 [ + + ]: 1378 : !pcxt->known_attached_workers[i])
1143 : : {
1144 : 1310 : pcxt->known_attached_workers[i] = true;
1145 : 1310 : pcxt->nknown_attached_workers++;
1146 : : }
1147 : :
3782 1148 : 1378 : msgtype = pq_getmsgbyte(msg);
1149 : :
1150 [ + - - + : 1378 : switch (msgtype)
- ]
1151 : : {
746 nathan@postgresql.or 1152 : 6 : case PqMsg_ErrorResponse:
1153 : : case PqMsg_NoticeResponse:
1154 : : {
1155 : : ErrorData edata;
1156 : : ErrorContextCallback *save_error_context_stack;
1157 : :
1158 : : /* Parse ErrorResponse or NoticeResponse. */
3782 rhaas@postgresql.org 1159 : 6 : pq_parse_errornotice(msg, &edata);
1160 : :
1161 : : /* Death of a worker isn't enough justification for suicide. */
1162 : 6 : edata.elevel = Min(edata.elevel, ERROR);
1163 : :
1164 : : /*
1165 : : * If desired, add a context line to show that this is a
1166 : : * message propagated from a parallel worker. Otherwise, it
1167 : : * can sometimes be confusing to understand what actually
1168 : : * happened. (We don't do this in DEBUG_PARALLEL_REGRESS mode
1169 : : * because it causes test-result instability depending on
1170 : : * whether a parallel worker is actually used or not.)
1171 : : */
934 drowley@postgresql.o 1172 [ + - ]: 6 : if (debug_parallel_query != DEBUG_PARALLEL_REGRESS)
1173 : : {
3298 tgl@sss.pgh.pa.us 1174 [ + + ]: 6 : if (edata.context)
1175 : 3 : edata.context = psprintf("%s\n%s", edata.context,
1176 : : _("parallel worker"));
1177 : : else
1178 : 3 : edata.context = pstrdup(_("parallel worker"));
1179 : : }
1180 : :
1181 : : /*
1182 : : * Context beyond that should use the error context callbacks
1183 : : * that were in effect when the ParallelContext was created,
1184 : : * not the current ones.
1185 : : */
1186 : 6 : save_error_context_stack = error_context_stack;
1187 : 6 : error_context_stack = pcxt->error_context_stack;
1188 : :
1189 : : /* Rethrow error or print notice. */
3782 rhaas@postgresql.org 1190 : 6 : ThrowErrorData(&edata);
1191 : :
1192 : : /* Not an error, so restore previous context stack. */
3782 rhaas@postgresql.org 1193 :UIC 0 : error_context_stack = save_error_context_stack;
1194 : :
1195 : 0 : break;
1196 : : }
1197 : :
746 nathan@postgresql.or 1198 : 0 : case PqMsg_NotificationResponse:
1199 : : {
1200 : : /* Propagate NotifyResponse. */
1201 : : int32 pid;
1202 : : const char *channel;
1203 : : const char *payload;
1204 : :
3355 rhaas@postgresql.org 1205 : 0 : pid = pq_getmsgint(msg, 4);
1206 : 0 : channel = pq_getmsgrawstring(msg);
1207 : 0 : payload = pq_getmsgrawstring(msg);
1208 : 0 : pq_endmessage(msg);
1209 : :
1210 : 0 : NotifyMyFrontEnd(channel, payload, pid);
1211 : :
3782 1212 : 0 : break;
1213 : : }
1214 : :
204 nathan@postgresql.or 1215 : 0 : case PqMsg_Progress:
1216 : : {
1217 : : /*
1218 : : * Only incremental progress reporting is currently supported.
1219 : : * However, it's possible to add more fields to the message to
1220 : : * allow for handling of other backend progress APIs.
1221 : : */
788 msawada@postgresql.o 1222 : 0 : int index = pq_getmsgint(msg, 4);
1223 : 0 : int64 incr = pq_getmsgint64(msg);
1224 : :
1225 : 0 : pq_getmsgend(msg);
1226 : :
1227 : 0 : pgstat_progress_incr_param(index, incr);
1228 : :
1229 : 0 : break;
1230 : : }
1231 : :
746 nathan@postgresql.or 1232 :GIC 1372 : case PqMsg_Terminate:
1233 : : {
2928 tgl@sss.pgh.pa.us 1234 : 1372 : shm_mq_detach(pcxt->worker[i].error_mqh);
3782 rhaas@postgresql.org 1235 : 1372 : pcxt->worker[i].error_mqh = NULL;
1236 : 1372 : break;
1237 : : }
1238 : :
3782 rhaas@postgresql.org 1239 :UIC 0 : default:
1240 : : {
3323 tgl@sss.pgh.pa.us 1241 [ # # ]: 0 : elog(ERROR, "unrecognized message type received from parallel worker: %c (message length %d bytes)",
1242 : : msgtype, msg->len);
1243 : : }
1244 : : }
3782 rhaas@postgresql.org 1245 :GIC 1372 : }
1246 : :
1247 : : /*
1248 : : * End-of-subtransaction cleanup for parallel contexts.
1249 : : *
1250 : : * Here we remove only parallel contexts initiated within the current
1251 : : * subtransaction.
1252 : : */
1253 : : void
1254 : 9084 : AtEOSubXact_Parallel(bool isCommit, SubTransactionId mySubId)
1255 : : {
1256 [ + + ]: 9087 : while (!dlist_is_empty(&pcxt_list))
1257 : : {
1258 : : ParallelContext *pcxt;
1259 : :
1260 : 3 : pcxt = dlist_head_element(ParallelContext, node, &pcxt_list);
1261 [ - + ]: 3 : if (pcxt->subid != mySubId)
3782 rhaas@postgresql.org 1262 :UIC 0 : break;
3782 rhaas@postgresql.org 1263 [ - + ]:GIC 3 : if (isCommit)
3782 rhaas@postgresql.org 1264 [ # # ]:UIC 0 : elog(WARNING, "leaked parallel context");
3782 rhaas@postgresql.org 1265 :GIC 3 : DestroyParallelContext(pcxt);
1266 : : }
1267 : 9084 : }
1268 : :
1269 : : /*
1270 : : * End-of-transaction cleanup for parallel contexts.
1271 : : *
1272 : : * We nuke all remaining parallel contexts.
1273 : : */
1274 : : void
1275 : 316890 : AtEOXact_Parallel(bool isCommit)
1276 : : {
1277 [ + + ]: 316893 : while (!dlist_is_empty(&pcxt_list))
1278 : : {
1279 : : ParallelContext *pcxt;
1280 : :
1281 : 3 : pcxt = dlist_head_element(ParallelContext, node, &pcxt_list);
1282 [ - + ]: 3 : if (isCommit)
3782 rhaas@postgresql.org 1283 [ # # ]:UIC 0 : elog(WARNING, "leaked parallel context");
3782 rhaas@postgresql.org 1284 :GIC 3 : DestroyParallelContext(pcxt);
1285 : : }
1286 : 316890 : }
1287 : :
1288 : : /*
1289 : : * Main entrypoint for parallel workers.
1290 : : */
1291 : : void
1292 : 1378 : ParallelWorkerMain(Datum main_arg)
1293 : : {
1294 : : dsm_segment *seg;
1295 : : shm_toc *toc;
1296 : : FixedParallelState *fps;
1297 : : char *error_queue_space;
1298 : : shm_mq *mq;
1299 : : shm_mq_handle *mqh;
1300 : : char *libraryspace;
1301 : : char *entrypointstate;
1302 : : char *library_name;
1303 : : char *function_name;
1304 : : parallel_worker_main_type entrypt;
1305 : : char *gucspace;
1306 : : char *combocidspace;
1307 : : char *tsnapspace;
1308 : : char *asnapspace;
1309 : : char *tstatespace;
1310 : : char *pendingsyncsspace;
1311 : : char *reindexspace;
1312 : : char *relmapperspace;
1313 : : char *uncommittedenumsspace;
1314 : : char *clientconninfospace;
1315 : : char *session_dsm_handle_space;
1316 : : Snapshot tsnapshot;
1317 : : Snapshot asnapshot;
1318 : :
1319 : : /* Set flag to indicate that we're initializing a parallel worker. */
3613 1320 : 1378 : InitializingParallelWorker = true;
1321 : :
1322 : : /* Establish signal handlers. */
3782 1323 : 1378 : pqsignal(SIGTERM, die);
1324 : 1378 : BackgroundWorkerUnblockSignals();
1325 : :
1326 : : /* Determine and set our parallel worker number. */
3593 1327 [ - + ]: 1378 : Assert(ParallelWorkerNumber == -1);
1328 : 1378 : memcpy(&ParallelWorkerNumber, MyBgworkerEntry->bgw_extra, sizeof(int));
1329 : :
1330 : : /* Set up a memory context to work in, just for cleanliness. */
3782 1331 : 1378 : CurrentMemoryContext = AllocSetContextCreate(TopMemoryContext,
1332 : : "Parallel worker",
1333 : : ALLOCSET_DEFAULT_SIZES);
1334 : :
1335 : : /*
1336 : : * Attach to the dynamic shared memory segment for the parallel query, and
1337 : : * find its table of contents.
1338 : : *
1339 : : * Note: at this point, we have not created any ResourceOwner in this
1340 : : * process. This will result in our DSM mapping surviving until process
1341 : : * exit, which is fine. If there were a ResourceOwner, it would acquire
1342 : : * ownership of the mapping, but we have no need for that.
1343 : : */
1344 : 1378 : seg = dsm_attach(DatumGetUInt32(main_arg));
1345 [ - + ]: 1378 : if (seg == NULL)
3782 rhaas@postgresql.org 1346 [ # # ]:UIC 0 : ereport(ERROR,
1347 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1348 : : errmsg("could not map dynamic shared memory segment")));
3782 rhaas@postgresql.org 1349 :GIC 1378 : toc = shm_toc_attach(PARALLEL_MAGIC, dsm_segment_address(seg));
1350 [ - + ]: 1378 : if (toc == NULL)
3782 rhaas@postgresql.org 1351 [ # # ]:UIC 0 : ereport(ERROR,
1352 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1353 : : errmsg("invalid magic number in dynamic shared memory segment")));
1354 : :
1355 : : /* Look up fixed parallel state. */
3015 tgl@sss.pgh.pa.us 1356 :GIC 1378 : fps = shm_toc_lookup(toc, PARALLEL_KEY_FIXED, false);
3782 rhaas@postgresql.org 1357 : 1378 : MyFixedParallelState = fps;
1358 : :
1359 : : /* Arrange to signal the leader if we exit. */
1910 andres@anarazel.de 1360 : 1378 : ParallelLeaderPid = fps->parallel_leader_pid;
552 heikki.linnakangas@i 1361 : 1378 : ParallelLeaderProcNumber = fps->parallel_leader_proc_number;
1620 andres@anarazel.de 1362 : 1378 : before_shmem_exit(ParallelWorkerShutdown, PointerGetDatum(seg));
1363 : :
1364 : : /*
1365 : : * Now we can find and attach to the error queue provided for us. That's
1366 : : * good, because until we do that, any errors that happen here will not be
1367 : : * reported back to the process that requested that this worker be
1368 : : * launched.
1369 : : */
3015 tgl@sss.pgh.pa.us 1370 : 1378 : error_queue_space = shm_toc_lookup(toc, PARALLEL_KEY_ERROR_QUEUE, false);
3782 rhaas@postgresql.org 1371 : 1378 : mq = (shm_mq *) (error_queue_space +
3759 bruce@momjian.us 1372 : 1378 : ParallelWorkerNumber * PARALLEL_ERROR_QUEUE_SIZE);
3782 rhaas@postgresql.org 1373 : 1378 : shm_mq_set_sender(mq, MyProc);
1374 : 1378 : mqh = shm_mq_attach(mq, seg, NULL);
3613 1375 : 1378 : pq_redirect_to_shm_mq(seg, mqh);
1910 andres@anarazel.de 1376 : 1378 : pq_set_parallel_leader(fps->parallel_leader_pid,
1377 : : fps->parallel_leader_proc_number);
1378 : :
1379 : : /*
1380 : : * Hooray! Primary initialization is complete. Now, we need to set up our
1381 : : * backend-local state to match the original backend.
1382 : : */
1383 : :
1384 : : /*
1385 : : * Join locking group. We must do this before anything that could try to
1386 : : * acquire a heavyweight lock, because any heavyweight locks acquired to
1387 : : * this point could block either directly against the parallel group
1388 : : * leader or against some process which in turn waits for a lock that
1389 : : * conflicts with the parallel group leader, causing an undetected
1390 : : * deadlock. (If we can't join the lock group, the leader has gone away,
1391 : : * so just exit quietly.)
1392 : : */
1393 [ - + ]: 1378 : if (!BecomeLockGroupMember(fps->parallel_leader_pgproc,
1394 : : fps->parallel_leader_pid))
3499 rhaas@postgresql.org 1395 :UIC 0 : return;
1396 : :
1397 : : /*
1398 : : * Restore transaction and statement start-time timestamps. This must
1399 : : * happen before anything that would start a transaction, else asserts in
1400 : : * xact.c will fire.
1401 : : */
2527 tgl@sss.pgh.pa.us 1402 :GIC 1378 : SetParallelStartTimestamps(fps->xact_ts, fps->stmt_ts);
1403 : :
1404 : : /*
1405 : : * Identify the entry point to be called. In theory this could result in
1406 : : * loading an additional library, though most likely the entry point is in
1407 : : * the core backend or in a library we just loaded.
1408 : : */
3015 1409 : 1378 : entrypointstate = shm_toc_lookup(toc, PARALLEL_KEY_ENTRYPOINT, false);
3067 1410 : 1378 : library_name = entrypointstate;
1411 : 1378 : function_name = entrypointstate + strlen(library_name) + 1;
1412 : :
1413 : 1378 : entrypt = LookupParallelWorkerFunction(library_name, function_name);
1414 : :
1415 : : /*
1416 : : * Restore current session authorization and role id. No verification
1417 : : * happens here, we just blindly adopt the leader's state. Note that this
1418 : : * has to happen before InitPostgres, since InitializeSessionUserId will
1419 : : * not set these variables.
1420 : : */
299 1421 : 1378 : SetAuthenticatedUserId(fps->authenticated_user_id);
1422 : 1378 : SetSessionAuthorization(fps->session_user_id,
1423 : 1378 : fps->session_user_is_superuser);
1424 : 1378 : SetCurrentRoleId(fps->outer_user_id, fps->role_is_superuser);
1425 : :
1426 : : /*
1427 : : * Restore database connection. We skip connection authorization checks,
1428 : : * reasoning that (a) the leader checked these things when it started, and
1429 : : * (b) we do not want parallel mode to cause these failures, because that
1430 : : * would make use of parallel query plans not transparent to applications.
1431 : : */
3782 rhaas@postgresql.org 1432 : 1378 : BackgroundWorkerInitializeConnectionByOid(fps->database_id,
1433 : : fps->authenticated_user_id,
1434 : : BGWORKER_BYPASS_ALLOWCONN |
1435 : : BGWORKER_BYPASS_ROLELOGINCHECK);
1436 : :
1437 : : /*
1438 : : * Set the client encoding to the database encoding, since that is what
1439 : : * the leader will expect. (We're cheating a bit by not calling
1440 : : * PrepareClientEncoding first. It's okay because this call will always
1441 : : * result in installing a no-op conversion. No error should be possible,
1442 : : * but check anyway.)
1443 : : */
396 tgl@sss.pgh.pa.us 1444 [ - + ]: 1378 : if (SetClientEncoding(GetDatabaseEncoding()) < 0)
396 tgl@sss.pgh.pa.us 1445 [ # # ]:UIC 0 : elog(ERROR, "SetClientEncoding(%d) failed", GetDatabaseEncoding());
1446 : :
1447 : : /*
1448 : : * Load libraries that were loaded by original backend. We want to do
1449 : : * this before restoring GUCs, because the libraries might define custom
1450 : : * variables.
1451 : : */
2543 tmunro@postgresql.or 1452 :GIC 1378 : libraryspace = shm_toc_lookup(toc, PARALLEL_KEY_LIBRARY, false);
1453 : 1378 : StartTransactionCommand();
1454 : 1378 : RestoreLibraryState(libraryspace);
3782 rhaas@postgresql.org 1455 : 1378 : CommitTransactionCommand();
1456 : :
1457 : : /* Crank up a transaction state appropriate to a parallel worker. */
3015 tgl@sss.pgh.pa.us 1458 : 1378 : tstatespace = shm_toc_lookup(toc, PARALLEL_KEY_TRANSACTION_STATE, false);
3782 rhaas@postgresql.org 1459 : 1378 : StartParallelWorkerTransaction(tstatespace);
1460 : :
1461 : : /*
1462 : : * Restore state that affects catalog access. Ideally we'd do this even
1463 : : * before calling InitPostgres, but that has order-of-initialization
1464 : : * problems, and also the relmapper would get confused during the
1465 : : * CommitTransactionCommand call above.
1466 : : */
317 noah@leadboat.com 1467 : 1378 : pendingsyncsspace = shm_toc_lookup(toc, PARALLEL_KEY_PENDING_SYNCS,
1468 : : false);
1469 : 1378 : RestorePendingSyncs(pendingsyncsspace);
352 tgl@sss.pgh.pa.us 1470 : 1378 : relmapperspace = shm_toc_lookup(toc, PARALLEL_KEY_RELMAPPER_STATE, false);
1471 : 1378 : RestoreRelationMap(relmapperspace);
1472 : 1378 : reindexspace = shm_toc_lookup(toc, PARALLEL_KEY_REINDEX_STATE, false);
1473 : 1378 : RestoreReindexState(reindexspace);
3015 1474 : 1378 : combocidspace = shm_toc_lookup(toc, PARALLEL_KEY_COMBO_CID, false);
3782 rhaas@postgresql.org 1475 : 1378 : RestoreComboCIDState(combocidspace);
1476 : :
1477 : : /* Attach to the per-session DSM segment and contained objects. */
1478 : : session_dsm_handle_space =
2914 andres@anarazel.de 1479 : 1378 : shm_toc_lookup(toc, PARALLEL_KEY_SESSION_DSM, false);
1480 : 1378 : AttachSession(*(dsm_handle *) session_dsm_handle_space);
1481 : :
1482 : : /*
1483 : : * If the transaction isolation level is REPEATABLE READ or SERIALIZABLE,
1484 : : * the leader has serialized the transaction snapshot and we must restore
1485 : : * it. At lower isolation levels, there is no transaction-lifetime
1486 : : * snapshot, but we need TransactionXmin to get set to a value which is
1487 : : * less than or equal to the xmin of every snapshot that will be used by
1488 : : * this worker. The easiest way to accomplish that is to install the
1489 : : * active snapshot as the transaction snapshot. Code running in this
1490 : : * parallel worker might take new snapshots via GetTransactionSnapshot()
1491 : : * or GetLatestSnapshot(), but it shouldn't have any way of acquiring a
1492 : : * snapshot older than the active snapshot.
1493 : : */
3015 tgl@sss.pgh.pa.us 1494 : 1378 : asnapspace = shm_toc_lookup(toc, PARALLEL_KEY_ACTIVE_SNAPSHOT, false);
1473 rhaas@postgresql.org 1495 : 1378 : tsnapspace = shm_toc_lookup(toc, PARALLEL_KEY_TRANSACTION_SNAPSHOT, true);
1496 : 1378 : asnapshot = RestoreSnapshot(asnapspace);
1497 [ + + ]: 1378 : tsnapshot = tsnapspace ? RestoreSnapshot(tsnapspace) : asnapshot;
1498 : 1378 : RestoreTransactionSnapshot(tsnapshot,
1499 : 1378 : fps->parallel_leader_pgproc);
1500 : 1378 : PushActiveSnapshot(asnapshot);
1501 : :
1502 : : /*
1503 : : * We've changed which tuples we can see, and must therefore invalidate
1504 : : * system caches.
1505 : : */
3613 1506 : 1378 : InvalidateSystemCaches();
1507 : :
1508 : : /*
1509 : : * Restore GUC values from launching backend. We can't do this earlier,
1510 : : * because GUC check hooks that do catalog lookups need to see the same
1511 : : * database state as the leader. Also, the check hooks for
1512 : : * session_authorization and role assume we already set the correct role
1513 : : * OIDs.
1514 : : */
396 tgl@sss.pgh.pa.us 1515 : 1378 : gucspace = shm_toc_lookup(toc, PARALLEL_KEY_GUC, false);
1516 : 1378 : RestoreGUCState(gucspace);
1517 : :
1518 : : /*
1519 : : * Restore current user ID and security context. No verification happens
1520 : : * here, we just blindly adopt the leader's state. We can't do this till
1521 : : * after restoring GUCs, else we'll get complaints about restoring
1522 : : * session_authorization and role. (In effect, we're assuming that all
1523 : : * the restored values are okay to set, even if we are now inside a
1524 : : * restricted context.)
1525 : : */
3782 rhaas@postgresql.org 1526 : 1378 : SetUserIdAndSecContext(fps->current_user_id, fps->sec_context);
1527 : :
1528 : : /* Restore temp-namespace state to ensure search path matches leader's. */
3376 tgl@sss.pgh.pa.us 1529 : 1378 : SetTempNamespaceState(fps->temp_namespace_id,
1530 : : fps->temp_toast_namespace_id);
1531 : :
1532 : : /* Restore uncommitted enums. */
1705 tmunro@postgresql.or 1533 : 1378 : uncommittedenumsspace = shm_toc_lookup(toc, PARALLEL_KEY_UNCOMMITTEDENUMS,
1534 : : false);
1535 : 1378 : RestoreUncommittedEnums(uncommittedenumsspace);
1536 : :
1537 : : /* Restore the ClientConnectionInfo. */
1109 michael@paquier.xyz 1538 : 1378 : clientconninfospace = shm_toc_lookup(toc, PARALLEL_KEY_CLIENTCONNINFO,
1539 : : false);
1540 : 1378 : RestoreClientConnectionInfo(clientconninfospace);
1541 : :
1542 : : /*
1543 : : * Initialize SystemUser now that MyClientConnectionInfo is restored. Also
1544 : : * ensure that auth_method is actually valid, aka authn_id is not NULL.
1545 : : */
1073 1546 [ + + ]: 1378 : if (MyClientConnectionInfo.authn_id)
1547 : 4 : InitializeSystemUser(MyClientConnectionInfo.authn_id,
1548 : : hba_authname(MyClientConnectionInfo.auth_method));
1549 : :
1550 : : /* Attach to the leader's serializable transaction, if SERIALIZABLE. */
2367 tmunro@postgresql.or 1551 : 1378 : AttachSerializableXact(fps->serializable_xact_handle);
1552 : :
1553 : : /*
1554 : : * We've initialized all of our state now; nothing should change
1555 : : * hereafter.
1556 : : */
3613 rhaas@postgresql.org 1557 : 1378 : InitializingParallelWorker = false;
3782 1558 : 1378 : EnterParallelMode();
1559 : :
1560 : : /*
1561 : : * Time to do the real work: invoke the caller-supplied code.
1562 : : */
3067 tgl@sss.pgh.pa.us 1563 : 1378 : entrypt(seg, toc);
1564 : :
1565 : : /* Must exit parallel mode to pop active snapshot. */
3782 rhaas@postgresql.org 1566 : 1372 : ExitParallelMode();
1567 : :
1568 : : /* Must pop active snapshot so snapmgr.c doesn't complain. */
1569 : 1372 : PopActiveSnapshot();
1570 : :
1571 : : /* Shut down the parallel-worker transaction. */
1572 : 1372 : EndParallelWorkerTransaction();
1573 : :
1574 : : /* Detach from the per-session DSM segment. */
2914 andres@anarazel.de 1575 : 1372 : DetachSession();
1576 : :
1577 : : /* Report success. */
746 nathan@postgresql.or 1578 : 1372 : pq_putmessage(PqMsg_Terminate, NULL, 0);
1579 : : }
1580 : :
1581 : : /*
1582 : : * Update shared memory with the ending location of the last WAL record we
1583 : : * wrote, if it's greater than the value already stored there.
1584 : : */
1585 : : void
3782 rhaas@postgresql.org 1586 : 1372 : ParallelWorkerReportLastRecEnd(XLogRecPtr last_xlog_end)
1587 : : {
1588 : 1372 : FixedParallelState *fps = MyFixedParallelState;
1589 : :
1590 [ - + ]: 1372 : Assert(fps != NULL);
1591 [ + + ]: 1372 : SpinLockAcquire(&fps->mutex);
1592 [ + + ]: 1372 : if (fps->last_xlog_end < last_xlog_end)
1593 : 72 : fps->last_xlog_end = last_xlog_end;
1594 : 1372 : SpinLockRelease(&fps->mutex);
1595 : 1372 : }
1596 : :
1597 : : /*
1598 : : * Make sure the leader tries to read from our error queue one more time.
1599 : : * This guards against the case where we exit uncleanly without sending an
1600 : : * ErrorResponse to the leader, for example because some code calls proc_exit
1601 : : * directly.
1602 : : *
1603 : : * Also explicitly detach from dsm segment so that subsystems using
1604 : : * on_dsm_detach() have a chance to send stats before the stats subsystem is
1605 : : * shut down as part of a before_shmem_exit() hook.
1606 : : *
1607 : : * One might think this could instead be solved by carefully ordering the
1608 : : * attaching to dsm segments, so that the pgstats segments get detached from
1609 : : * later than the parallel query one. That turns out to not work because the
1610 : : * stats hash might need to grow which can cause new segments to be allocated,
1611 : : * which then will be detached from earlier.
1612 : : */
1613 : : static void
2783 1614 : 1378 : ParallelWorkerShutdown(int code, Datum arg)
1615 : : {
1910 andres@anarazel.de 1616 : 1378 : SendProcSignal(ParallelLeaderPid,
1617 : : PROCSIG_PARALLEL_MESSAGE,
1618 : : ParallelLeaderProcNumber);
1619 : :
1620 1620 : 1378 : dsm_detach((dsm_segment *) DatumGetPointer(arg));
2783 rhaas@postgresql.org 1621 : 1378 : }
1622 : :
1623 : : /*
1624 : : * Look up (and possibly load) a parallel worker entry point function.
1625 : : *
1626 : : * For functions contained in the core code, we use library name "postgres"
1627 : : * and consult the InternalParallelWorkers array. External functions are
1628 : : * looked up, and loaded if necessary, using load_external_function().
1629 : : *
1630 : : * The point of this is to pass function names as strings across process
1631 : : * boundaries. We can't pass actual function addresses because of the
1632 : : * possibility that the function has been loaded at a different address
1633 : : * in a different process. This is obviously a hazard for functions in
1634 : : * loadable libraries, but it can happen even for functions in the core code
1635 : : * on platforms using EXEC_BACKEND (e.g., Windows).
1636 : : *
1637 : : * At some point it might be worthwhile to get rid of InternalParallelWorkers[]
1638 : : * in favor of applying load_external_function() for core functions too;
1639 : : * but that raises portability issues that are not worth addressing now.
1640 : : */
1641 : : static parallel_worker_main_type
3067 tgl@sss.pgh.pa.us 1642 : 1378 : LookupParallelWorkerFunction(const char *libraryname, const char *funcname)
1643 : : {
1644 : : /*
1645 : : * If the function is to be loaded from postgres itself, search the
1646 : : * InternalParallelWorkers array.
1647 : : */
1648 [ + - ]: 1378 : if (strcmp(libraryname, "postgres") == 0)
1649 : : {
1650 : : int i;
1651 : :
1652 [ + - ]: 1588 : for (i = 0; i < lengthof(InternalParallelWorkers); i++)
1653 : : {
1654 [ + + ]: 1588 : if (strcmp(InternalParallelWorkers[i].fn_name, funcname) == 0)
1655 : 1378 : return InternalParallelWorkers[i].fn_addr;
1656 : : }
1657 : :
1658 : : /* We can only reach this by programming error. */
3067 tgl@sss.pgh.pa.us 1659 [ # # ]:UIC 0 : elog(ERROR, "internal function \"%s\" not found", funcname);
1660 : : }
1661 : :
1662 : : /* Otherwise load from external library. */
1663 : 0 : return (parallel_worker_main_type)
1664 : 0 : load_external_function(libraryname, funcname, true, NULL);
1665 : : }
|