Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * parallel.c
4 : : * Infrastructure for launching parallel workers
5 : : *
6 : : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
7 : : * Portions Copyright (c) 1994, Regents of the University of California
8 : : *
9 : : * IDENTIFICATION
10 : : * src/backend/access/transam/parallel.c
11 : : *
12 : : *-------------------------------------------------------------------------
13 : : */
14 : :
15 : : #include "postgres.h"
16 : :
17 : : #include "access/brin.h"
18 : : #include "access/gin.h"
19 : : #include "access/nbtree.h"
20 : : #include "access/parallel.h"
21 : : #include "access/session.h"
22 : : #include "access/xact.h"
23 : : #include "access/xlog.h"
24 : : #include "catalog/index.h"
25 : : #include "catalog/namespace.h"
26 : : #include "catalog/pg_enum.h"
27 : : #include "catalog/storage.h"
28 : : #include "commands/async.h"
29 : : #include "commands/vacuum.h"
30 : : #include "executor/execParallel.h"
31 : : #include "libpq/libpq.h"
32 : : #include "libpq/pqformat.h"
33 : : #include "libpq/pqmq.h"
34 : : #include "miscadmin.h"
35 : : #include "optimizer/optimizer.h"
36 : : #include "pgstat.h"
37 : : #include "storage/ipc.h"
38 : : #include "storage/predicate.h"
39 : : #include "storage/proc.h"
40 : : #include "storage/spin.h"
41 : : #include "tcop/tcopprot.h"
42 : : #include "utils/combocid.h"
43 : : #include "utils/guc.h"
44 : : #include "utils/inval.h"
45 : : #include "utils/memutils.h"
46 : : #include "utils/relmapper.h"
47 : : #include "utils/snapmgr.h"
48 : : #include "utils/wait_event.h"
49 : :
50 : : /*
51 : : * We don't want to waste a lot of memory on an error queue which, most of
52 : : * the time, will process only a handful of small messages. However, it is
53 : : * desirable to make it large enough that a typical ErrorResponse can be sent
54 : : * without blocking. That way, a worker that errors out can write the whole
55 : : * message into the queue and terminate without waiting for the user backend.
56 : : */
57 : : #define PARALLEL_ERROR_QUEUE_SIZE 16384
58 : :
59 : : /* Magic number for parallel context TOC. */
60 : : #define PARALLEL_MAGIC 0x50477c7c
61 : :
62 : : /*
63 : : * Magic numbers for per-context parallel state sharing. Higher-level code
64 : : * should use smaller values, leaving these very large ones for use by this
65 : : * module.
66 : : */
67 : : #define PARALLEL_KEY_FIXED UINT64CONST(0xFFFFFFFFFFFF0001)
68 : : #define PARALLEL_KEY_ERROR_QUEUE UINT64CONST(0xFFFFFFFFFFFF0002)
69 : : #define PARALLEL_KEY_LIBRARY UINT64CONST(0xFFFFFFFFFFFF0003)
70 : : #define PARALLEL_KEY_GUC UINT64CONST(0xFFFFFFFFFFFF0004)
71 : : #define PARALLEL_KEY_COMBO_CID UINT64CONST(0xFFFFFFFFFFFF0005)
72 : : #define PARALLEL_KEY_TRANSACTION_SNAPSHOT UINT64CONST(0xFFFFFFFFFFFF0006)
73 : : #define PARALLEL_KEY_ACTIVE_SNAPSHOT UINT64CONST(0xFFFFFFFFFFFF0007)
74 : : #define PARALLEL_KEY_TRANSACTION_STATE UINT64CONST(0xFFFFFFFFFFFF0008)
75 : : #define PARALLEL_KEY_ENTRYPOINT UINT64CONST(0xFFFFFFFFFFFF0009)
76 : : #define PARALLEL_KEY_SESSION_DSM UINT64CONST(0xFFFFFFFFFFFF000A)
77 : : #define PARALLEL_KEY_PENDING_SYNCS UINT64CONST(0xFFFFFFFFFFFF000B)
78 : : #define PARALLEL_KEY_REINDEX_STATE UINT64CONST(0xFFFFFFFFFFFF000C)
79 : : #define PARALLEL_KEY_RELMAPPER_STATE UINT64CONST(0xFFFFFFFFFFFF000D)
80 : : #define PARALLEL_KEY_UNCOMMITTEDENUMS UINT64CONST(0xFFFFFFFFFFFF000E)
81 : : #define PARALLEL_KEY_CLIENTCONNINFO UINT64CONST(0xFFFFFFFFFFFF000F)
82 : :
83 : : /* Fixed-size parallel state. */
84 : : typedef struct FixedParallelState
85 : : {
86 : : /* Fixed-size state that workers must restore. */
87 : : Oid database_id;
88 : : Oid authenticated_user_id;
89 : : Oid session_user_id;
90 : : Oid outer_user_id;
91 : : Oid current_user_id;
92 : : Oid temp_namespace_id;
93 : : Oid temp_toast_namespace_id;
94 : : int sec_context;
95 : : bool session_user_is_superuser;
96 : : bool role_is_superuser;
97 : : PGPROC *parallel_leader_pgproc;
98 : : pid_t parallel_leader_pid;
99 : : ProcNumber parallel_leader_proc_number;
100 : : TimestampTz xact_ts;
101 : : TimestampTz stmt_ts;
102 : : SerializableXactHandle serializable_xact_handle;
103 : :
104 : : /* Mutex protects remaining fields. */
105 : : slock_t mutex;
106 : :
107 : : /* Maximum XactLastRecEnd of any worker. */
108 : : XLogRecPtr last_xlog_end;
109 : : } FixedParallelState;
110 : :
111 : : /*
112 : : * Our parallel worker number. We initialize this to -1, meaning that we are
113 : : * not a parallel worker. In parallel workers, it will be set to a value >= 0
114 : : * and < the number of workers before any user code is invoked; each parallel
115 : : * worker will get a different parallel worker number.
116 : : */
117 : : int ParallelWorkerNumber = -1;
118 : :
119 : : /* Is there a parallel message pending which we need to receive? */
120 : : volatile sig_atomic_t ParallelMessagePending = false;
121 : :
122 : : /* Are we initializing a parallel worker? */
123 : : bool InitializingParallelWorker = false;
124 : :
125 : : /* Pointer to our fixed parallel state. */
126 : : static FixedParallelState *MyFixedParallelState;
127 : :
128 : : /* List of active parallel contexts. */
129 : : static dlist_head pcxt_list = DLIST_STATIC_INIT(pcxt_list);
130 : :
131 : : /* Backend-local copy of data from FixedParallelState. */
132 : : static pid_t ParallelLeaderPid;
133 : :
134 : : /*
135 : : * List of internal parallel worker entry points. We need this for
136 : : * reasons explained in LookupParallelWorkerFunction(), below.
137 : : */
138 : : static const struct
139 : : {
140 : : const char *fn_name;
141 : : parallel_worker_main_type fn_addr;
142 : : } InternalParallelWorkers[] =
143 : :
144 : : {
145 : : {
146 : : "ParallelQueryMain", ParallelQueryMain
147 : : },
148 : : {
149 : : "_bt_parallel_build_main", _bt_parallel_build_main
150 : : },
151 : : {
152 : : "_brin_parallel_build_main", _brin_parallel_build_main
153 : : },
154 : : {
155 : : "_gin_parallel_build_main", _gin_parallel_build_main
156 : : },
157 : : {
158 : : "parallel_vacuum_main", parallel_vacuum_main
159 : : }
160 : : };
161 : :
162 : : /* Private functions. */
163 : : static void ProcessParallelMessage(ParallelContext *pcxt, int i, StringInfo msg);
164 : : static void WaitForParallelWorkersToExit(ParallelContext *pcxt);
165 : : static parallel_worker_main_type LookupParallelWorkerFunction(const char *libraryname, const char *funcname);
166 : : static void ParallelWorkerShutdown(int code, Datum arg);
167 : :
168 : :
169 : : /*
170 : : * Establish a new parallel context. This should be done after entering
171 : : * parallel mode, and (unless there is an error) the context should be
172 : : * destroyed before exiting the current subtransaction.
173 : : */
174 : : ParallelContext *
3257 tgl@sss.pgh.pa.us 175 :CBC 502 : CreateParallelContext(const char *library_name, const char *function_name,
176 : : int nworkers)
177 : : {
178 : : MemoryContext oldcontext;
179 : : ParallelContext *pcxt;
180 : :
181 : : /* It is unsafe to create a parallel context if not in parallel mode. */
3972 rhaas@postgresql.org 182 [ - + ]: 502 : Assert(IsInParallelMode());
183 : :
184 : : /* Number of workers should be non-negative. */
185 [ - + ]: 502 : Assert(nworkers >= 0);
186 : :
187 : : /* We might be running in a short-lived memory context. */
188 : 502 : oldcontext = MemoryContextSwitchTo(TopTransactionContext);
189 : :
190 : : /* Initialize a new ParallelContext. */
95 michael@paquier.xyz 191 :GNC 502 : pcxt = palloc0_object(ParallelContext);
3972 rhaas@postgresql.org 192 :CBC 502 : pcxt->subid = GetCurrentSubTransactionId();
193 : 502 : pcxt->nworkers = nworkers;
2246 akapila@postgresql.o 194 : 502 : pcxt->nworkers_to_launch = nworkers;
3257 tgl@sss.pgh.pa.us 195 : 502 : pcxt->library_name = pstrdup(library_name);
196 : 502 : pcxt->function_name = pstrdup(function_name);
3972 rhaas@postgresql.org 197 : 502 : pcxt->error_context_stack = error_context_stack;
198 : 502 : shm_toc_initialize_estimator(&pcxt->estimator);
199 : 502 : dlist_push_head(&pcxt_list, &pcxt->node);
200 : :
201 : : /* Restore previous memory context. */
202 : 502 : MemoryContextSwitchTo(oldcontext);
203 : :
204 : 502 : return pcxt;
205 : : }
206 : :
207 : : /*
208 : : * Establish the dynamic shared memory segment for a parallel context and
209 : : * copy state and other bookkeeping information that will be needed by
210 : : * parallel workers into it.
211 : : */
212 : : void
213 : 502 : InitializeParallelDSM(ParallelContext *pcxt)
214 : : {
215 : : MemoryContext oldcontext;
3949 bruce@momjian.us 216 : 502 : Size library_len = 0;
217 : 502 : Size guc_len = 0;
218 : 502 : Size combocidlen = 0;
219 : 502 : Size tsnaplen = 0;
220 : 502 : Size asnaplen = 0;
221 : 502 : Size tstatelen = 0;
2171 noah@leadboat.com 222 : 502 : Size pendingsyncslen = 0;
2977 rhaas@postgresql.org 223 : 502 : Size reindexlen = 0;
2774 pg@bowt.ie 224 : 502 : Size relmapperlen = 0;
1895 tmunro@postgresql.or 225 : 502 : Size uncommittedenumslen = 0;
1299 michael@paquier.xyz 226 : 502 : Size clientconninfolen = 0;
3949 bruce@momjian.us 227 : 502 : Size segsize = 0;
228 : : int i;
229 : : FixedParallelState *fps;
3104 andres@anarazel.de 230 : 502 : dsm_handle session_dsm_handle = DSM_HANDLE_INVALID;
3972 rhaas@postgresql.org 231 : 502 : Snapshot transaction_snapshot = GetTransactionSnapshot();
232 : 502 : Snapshot active_snapshot = GetActiveSnapshot();
233 : :
234 : : /* We might be running in a very short-lived memory context. */
235 : 502 : oldcontext = MemoryContextSwitchTo(TopTransactionContext);
236 : :
237 : : /* Allow space to store the fixed-size parallel state. */
238 : 502 : shm_toc_estimate_chunk(&pcxt->estimator, sizeof(FixedParallelState));
239 : 502 : shm_toc_estimate_keys(&pcxt->estimator, 1);
240 : :
241 : : /*
242 : : * If we manage to reach here while non-interruptible, it's unsafe to
243 : : * launch any workers: we would fail to process interrupts sent by them.
244 : : * We can deal with that edge case by pretending no workers were
245 : : * requested.
246 : : */
492 tgl@sss.pgh.pa.us 247 [ + - + - : 502 : if (!INTERRUPTS_CAN_BE_PROCESSED())
- + ]
492 tgl@sss.pgh.pa.us 248 :UBC 0 : pcxt->nworkers = 0;
249 : :
250 : : /*
251 : : * Normally, the user will have requested at least one worker process, but
252 : : * if by chance they have not, we can skip a bunch of things here.
253 : : */
3104 andres@anarazel.de 254 [ + - ]:CBC 502 : if (pcxt->nworkers > 0)
255 : : {
256 : : /* Get (or create) the per-session DSM segment's handle. */
257 : 502 : session_dsm_handle = GetSessionDsmHandle();
258 : :
259 : : /*
260 : : * If we weren't able to create a per-session DSM segment, then we can
261 : : * continue but we can't safely launch any workers because their
262 : : * record typmods would be incompatible so they couldn't exchange
263 : : * tuples.
264 : : */
265 [ - + ]: 502 : if (session_dsm_handle == DSM_HANDLE_INVALID)
3104 andres@anarazel.de 266 :UBC 0 : pcxt->nworkers = 0;
267 : : }
268 : :
3972 rhaas@postgresql.org 269 [ + - ]:CBC 502 : if (pcxt->nworkers > 0)
270 : : {
271 : : StaticAssertDecl(BUFFERALIGN(PARALLEL_ERROR_QUEUE_SIZE) ==
272 : : PARALLEL_ERROR_QUEUE_SIZE,
273 : : "parallel error queue size not buffer-aligned");
274 : :
275 : : /* Estimate space for various kinds of state sharing. */
276 : 502 : library_len = EstimateLibraryStateSpace();
277 : 502 : shm_toc_estimate_chunk(&pcxt->estimator, library_len);
278 : 502 : guc_len = EstimateGUCStateSpace();
279 : 502 : shm_toc_estimate_chunk(&pcxt->estimator, guc_len);
280 : 502 : combocidlen = EstimateComboCIDStateSpace();
281 : 502 : shm_toc_estimate_chunk(&pcxt->estimator, combocidlen);
1663 282 [ + + ]: 502 : if (IsolationUsesXactSnapshot())
283 : : {
284 : 11 : tsnaplen = EstimateSnapshotSpace(transaction_snapshot);
285 : 11 : shm_toc_estimate_chunk(&pcxt->estimator, tsnaplen);
286 : : }
3972 287 : 502 : asnaplen = EstimateSnapshotSpace(active_snapshot);
288 : 502 : shm_toc_estimate_chunk(&pcxt->estimator, asnaplen);
289 : 502 : tstatelen = EstimateTransactionStateSpace();
290 : 502 : shm_toc_estimate_chunk(&pcxt->estimator, tstatelen);
3104 andres@anarazel.de 291 : 502 : shm_toc_estimate_chunk(&pcxt->estimator, sizeof(dsm_handle));
2171 noah@leadboat.com 292 : 502 : pendingsyncslen = EstimatePendingSyncsSpace();
293 : 502 : shm_toc_estimate_chunk(&pcxt->estimator, pendingsyncslen);
2977 rhaas@postgresql.org 294 : 502 : reindexlen = EstimateReindexStateSpace();
295 : 502 : shm_toc_estimate_chunk(&pcxt->estimator, reindexlen);
2774 pg@bowt.ie 296 : 502 : relmapperlen = EstimateRelationMapSpace();
297 : 502 : shm_toc_estimate_chunk(&pcxt->estimator, relmapperlen);
1895 tmunro@postgresql.or 298 : 502 : uncommittedenumslen = EstimateUncommittedEnumsSpace();
299 : 502 : shm_toc_estimate_chunk(&pcxt->estimator, uncommittedenumslen);
1299 michael@paquier.xyz 300 : 502 : clientconninfolen = EstimateClientConnectionInfoSpace();
301 : 502 : shm_toc_estimate_chunk(&pcxt->estimator, clientconninfolen);
302 : : /* If you add more chunks here, you probably need to add keys. */
303 : 502 : shm_toc_estimate_keys(&pcxt->estimator, 12);
304 : :
305 : : /* Estimate space need for error queues. */
3972 rhaas@postgresql.org 306 : 502 : shm_toc_estimate_chunk(&pcxt->estimator,
307 : : mul_size(PARALLEL_ERROR_QUEUE_SIZE,
308 : : pcxt->nworkers));
309 : 502 : shm_toc_estimate_keys(&pcxt->estimator, 1);
310 : :
311 : : /* Estimate how much we'll need for the entrypoint info. */
3257 tgl@sss.pgh.pa.us 312 : 502 : shm_toc_estimate_chunk(&pcxt->estimator, strlen(pcxt->library_name) +
313 : : strlen(pcxt->function_name) + 2);
314 : 502 : shm_toc_estimate_keys(&pcxt->estimator, 1);
315 : : }
316 : :
317 : : /*
318 : : * Create DSM and initialize with new table of contents. But if the user
319 : : * didn't request any workers, then don't bother creating a dynamic shared
320 : : * memory segment; instead, just use backend-private memory.
321 : : *
322 : : * Also, if we can't create a dynamic shared memory segment because the
323 : : * maximum number of segments have already been created, then fall back to
324 : : * backend-private memory, and plan not to use any workers. We hope this
325 : : * won't happen very often, but it's better to abandon the use of
326 : : * parallelism than to fail outright.
327 : : */
3972 rhaas@postgresql.org 328 : 502 : segsize = shm_toc_estimate(&pcxt->estimator);
3559 tgl@sss.pgh.pa.us 329 [ + - ]: 502 : if (pcxt->nworkers > 0)
3972 rhaas@postgresql.org 330 : 502 : pcxt->seg = dsm_create(segsize, DSM_CREATE_NULL_IF_MAXSEGMENTS);
331 [ + - ]: 502 : if (pcxt->seg != NULL)
332 : 502 : pcxt->toc = shm_toc_create(PARALLEL_MAGIC,
333 : : dsm_segment_address(pcxt->seg),
334 : : segsize);
335 : : else
336 : : {
3972 rhaas@postgresql.org 337 :UBC 0 : pcxt->nworkers = 0;
3967 338 : 0 : pcxt->private_memory = MemoryContextAlloc(TopMemoryContext, segsize);
339 : 0 : pcxt->toc = shm_toc_create(PARALLEL_MAGIC, pcxt->private_memory,
340 : : segsize);
341 : : }
342 : :
343 : : /* Initialize fixed-size state in shared memory. */
344 : : fps = (FixedParallelState *)
3972 rhaas@postgresql.org 345 :CBC 502 : shm_toc_allocate(pcxt->toc, sizeof(FixedParallelState));
346 : 502 : fps->database_id = MyDatabaseId;
347 : 502 : fps->authenticated_user_id = GetAuthenticatedUserId();
489 tgl@sss.pgh.pa.us 348 : 502 : fps->session_user_id = GetSessionUserId();
3059 rhaas@postgresql.org 349 : 502 : fps->outer_user_id = GetCurrentRoleId();
3972 350 : 502 : GetUserIdAndSecContext(&fps->current_user_id, &fps->sec_context);
489 tgl@sss.pgh.pa.us 351 : 502 : fps->session_user_is_superuser = GetSessionUserIsSuperuser();
352 : 502 : fps->role_is_superuser = current_role_is_superuser;
3566 353 : 502 : GetTempNamespaceState(&fps->temp_namespace_id,
354 : : &fps->temp_toast_namespace_id);
2100 andres@anarazel.de 355 : 502 : fps->parallel_leader_pgproc = MyProc;
356 : 502 : fps->parallel_leader_pid = MyProcPid;
742 heikki.linnakangas@i 357 : 502 : fps->parallel_leader_proc_number = MyProcNumber;
2717 tgl@sss.pgh.pa.us 358 : 502 : fps->xact_ts = GetCurrentTransactionStartTimestamp();
359 : 502 : fps->stmt_ts = GetCurrentStatementStartTimestamp();
2557 tmunro@postgresql.or 360 : 502 : fps->serializable_xact_handle = ShareSerializableXact();
3972 rhaas@postgresql.org 361 : 502 : SpinLockInit(&fps->mutex);
45 alvherre@kurilemu.de 362 :GNC 502 : fps->last_xlog_end = InvalidXLogRecPtr;
3972 rhaas@postgresql.org 363 :CBC 502 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_FIXED, fps);
364 : :
365 : : /* We can skip the rest of this if we're not budgeting for any workers. */
366 [ + - ]: 502 : if (pcxt->nworkers > 0)
367 : : {
368 : : char *libraryspace;
369 : : char *gucspace;
370 : : char *combocidspace;
371 : : char *tsnapspace;
372 : : char *asnapspace;
373 : : char *tstatespace;
374 : : char *pendingsyncsspace;
375 : : char *reindexspace;
376 : : char *relmapperspace;
377 : : char *error_queue_space;
378 : : char *session_dsm_handle_space;
379 : : char *entrypointstate;
380 : : char *uncommittedenumsspace;
381 : : char *clientconninfospace;
382 : : Size lnamelen;
383 : :
384 : : /* Serialize shared libraries we have loaded. */
385 : 502 : libraryspace = shm_toc_allocate(pcxt->toc, library_len);
386 : 502 : SerializeLibraryState(library_len, libraryspace);
387 : 502 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_LIBRARY, libraryspace);
388 : :
389 : : /* Serialize GUC settings. */
390 : 502 : gucspace = shm_toc_allocate(pcxt->toc, guc_len);
391 : 502 : SerializeGUCState(guc_len, gucspace);
392 : 502 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_GUC, gucspace);
393 : :
394 : : /* Serialize combo CID state. */
395 : 502 : combocidspace = shm_toc_allocate(pcxt->toc, combocidlen);
396 : 502 : SerializeComboCIDState(combocidlen, combocidspace);
397 : 502 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_COMBO_CID, combocidspace);
398 : :
399 : : /*
400 : : * Serialize the transaction snapshot if the transaction isolation
401 : : * level uses a transaction snapshot.
402 : : */
1663 403 [ + + ]: 502 : if (IsolationUsesXactSnapshot())
404 : : {
405 : 11 : tsnapspace = shm_toc_allocate(pcxt->toc, tsnaplen);
406 : 11 : SerializeSnapshot(transaction_snapshot, tsnapspace);
407 : 11 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_TRANSACTION_SNAPSHOT,
408 : : tsnapspace);
409 : : }
410 : :
411 : : /* Serialize the active snapshot. */
3972 412 : 502 : asnapspace = shm_toc_allocate(pcxt->toc, asnaplen);
413 : 502 : SerializeSnapshot(active_snapshot, asnapspace);
414 : 502 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_ACTIVE_SNAPSHOT, asnapspace);
415 : :
416 : : /* Provide the handle for per-session segment. */
3104 andres@anarazel.de 417 : 502 : session_dsm_handle_space = shm_toc_allocate(pcxt->toc,
418 : : sizeof(dsm_handle));
419 : 502 : *(dsm_handle *) session_dsm_handle_space = session_dsm_handle;
420 : 502 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_SESSION_DSM,
421 : : session_dsm_handle_space);
422 : :
423 : : /* Serialize transaction state. */
3972 rhaas@postgresql.org 424 : 502 : tstatespace = shm_toc_allocate(pcxt->toc, tstatelen);
425 : 502 : SerializeTransactionState(tstatelen, tstatespace);
426 : 502 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_TRANSACTION_STATE, tstatespace);
427 : :
428 : : /* Serialize pending syncs. */
2171 noah@leadboat.com 429 : 502 : pendingsyncsspace = shm_toc_allocate(pcxt->toc, pendingsyncslen);
430 : 502 : SerializePendingSyncs(pendingsyncslen, pendingsyncsspace);
431 : 502 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_PENDING_SYNCS,
432 : : pendingsyncsspace);
433 : :
434 : : /* Serialize reindex state. */
2977 rhaas@postgresql.org 435 : 502 : reindexspace = shm_toc_allocate(pcxt->toc, reindexlen);
436 : 502 : SerializeReindexState(reindexlen, reindexspace);
437 : 502 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_REINDEX_STATE, reindexspace);
438 : :
439 : : /* Serialize relmapper state. */
2774 pg@bowt.ie 440 : 502 : relmapperspace = shm_toc_allocate(pcxt->toc, relmapperlen);
441 : 502 : SerializeRelationMap(relmapperlen, relmapperspace);
442 : 502 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_RELMAPPER_STATE,
443 : : relmapperspace);
444 : :
445 : : /* Serialize uncommitted enum state. */
1895 tmunro@postgresql.or 446 : 502 : uncommittedenumsspace = shm_toc_allocate(pcxt->toc,
447 : : uncommittedenumslen);
448 : 502 : SerializeUncommittedEnums(uncommittedenumsspace, uncommittedenumslen);
449 : 502 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_UNCOMMITTEDENUMS,
450 : : uncommittedenumsspace);
451 : :
452 : : /* Serialize our ClientConnectionInfo. */
1299 michael@paquier.xyz 453 : 502 : clientconninfospace = shm_toc_allocate(pcxt->toc, clientconninfolen);
454 : 502 : SerializeClientConnectionInfo(clientconninfolen, clientconninfospace);
455 : 502 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_CLIENTCONNINFO,
456 : : clientconninfospace);
457 : :
458 : : /* Allocate space for worker information. */
95 michael@paquier.xyz 459 :GNC 502 : pcxt->worker = palloc0_array(ParallelWorkerInfo, pcxt->nworkers);
460 : :
461 : : /*
462 : : * Establish error queues in dynamic shared memory.
463 : : *
464 : : * These queues should be used only for transmitting ErrorResponse,
465 : : * NoticeResponse, and NotifyResponse protocol messages. Tuple data
466 : : * should be transmitted via separate (possibly larger?) queues.
467 : : */
468 : : error_queue_space =
3949 bruce@momjian.us 469 :CBC 502 : shm_toc_allocate(pcxt->toc,
470 : : mul_size(PARALLEL_ERROR_QUEUE_SIZE,
3600 rhaas@postgresql.org 471 : 502 : pcxt->nworkers));
3972 472 [ + + ]: 1620 : for (i = 0; i < pcxt->nworkers; ++i)
473 : : {
474 : : char *start;
475 : : shm_mq *mq;
476 : :
477 : 1118 : start = error_queue_space + i * PARALLEL_ERROR_QUEUE_SIZE;
478 : 1118 : mq = shm_mq_create(start, PARALLEL_ERROR_QUEUE_SIZE);
479 : 1118 : shm_mq_set_receiver(mq, MyProc);
480 : 1118 : pcxt->worker[i].error_mqh = shm_mq_attach(mq, pcxt->seg, NULL);
481 : : }
482 : 502 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_ERROR_QUEUE, error_queue_space);
483 : :
484 : : /*
485 : : * Serialize entrypoint information. It's unsafe to pass function
486 : : * pointers across processes, as the function pointer may be different
487 : : * in each process in EXEC_BACKEND builds, so we always pass library
488 : : * and function name. (We use library name "postgres" for functions
489 : : * in the core backend.)
490 : : */
3257 tgl@sss.pgh.pa.us 491 : 502 : lnamelen = strlen(pcxt->library_name);
492 : 502 : entrypointstate = shm_toc_allocate(pcxt->toc, lnamelen +
493 : 502 : strlen(pcxt->function_name) + 2);
494 : 502 : strcpy(entrypointstate, pcxt->library_name);
495 : 502 : strcpy(entrypointstate + lnamelen + 1, pcxt->function_name);
496 : 502 : shm_toc_insert(pcxt->toc, PARALLEL_KEY_ENTRYPOINT, entrypointstate);
497 : : }
498 : :
499 : : /* Update nworkers_to_launch, in case we changed nworkers above. */
492 500 : 502 : pcxt->nworkers_to_launch = pcxt->nworkers;
501 : :
502 : : /* Restore previous memory context. */
3972 rhaas@postgresql.org 503 : 502 : MemoryContextSwitchTo(oldcontext);
504 : 502 : }
505 : :
506 : : /*
507 : : * Reinitialize the dynamic shared memory segment for a parallel context such
508 : : * that we could launch workers for it again.
509 : : */
510 : : void
3789 511 : 134 : ReinitializeParallelDSM(ParallelContext *pcxt)
512 : : {
513 : : MemoryContext oldcontext;
514 : : FixedParallelState *fps;
515 : :
516 : : /* We might be running in a very short-lived memory context. */
89 517 : 134 : oldcontext = MemoryContextSwitchTo(TopTransactionContext);
518 : :
519 : : /* Wait for any old workers to exit. */
3559 tgl@sss.pgh.pa.us 520 [ + - ]: 134 : if (pcxt->nworkers_launched > 0)
521 : : {
522 : 134 : WaitForParallelWorkersToFinish(pcxt);
523 : 134 : WaitForParallelWorkersToExit(pcxt);
524 : 134 : pcxt->nworkers_launched = 0;
2963 rhaas@postgresql.org 525 [ + - ]: 134 : if (pcxt->known_attached_workers)
526 : : {
527 : 134 : pfree(pcxt->known_attached_workers);
528 : 134 : pcxt->known_attached_workers = NULL;
529 : 134 : pcxt->nknown_attached_workers = 0;
530 : : }
531 : : }
532 : :
533 : : /* Reset a few bits of fixed parallel state to a clean state. */
3205 tgl@sss.pgh.pa.us 534 : 134 : fps = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_FIXED, false);
45 alvherre@kurilemu.de 535 :GNC 134 : fps->last_xlog_end = InvalidXLogRecPtr;
536 : :
537 : : /* Recreate error queues (if they exist). */
2963 tgl@sss.pgh.pa.us 538 [ + - ]:CBC 134 : if (pcxt->nworkers > 0)
539 : : {
540 : : char *error_queue_space;
541 : : int i;
542 : :
543 : : error_queue_space =
544 : 134 : shm_toc_lookup(pcxt->toc, PARALLEL_KEY_ERROR_QUEUE, false);
545 [ + + ]: 551 : for (i = 0; i < pcxt->nworkers; ++i)
546 : : {
547 : : char *start;
548 : : shm_mq *mq;
549 : :
550 : 417 : start = error_queue_space + i * PARALLEL_ERROR_QUEUE_SIZE;
551 : 417 : mq = shm_mq_create(start, PARALLEL_ERROR_QUEUE_SIZE);
552 : 417 : shm_mq_set_receiver(mq, MyProc);
553 : 417 : pcxt->worker[i].error_mqh = shm_mq_attach(mq, pcxt->seg, NULL);
554 : : }
555 : : }
556 : :
557 : : /* Restore previous memory context. */
89 rhaas@postgresql.org 558 : 134 : MemoryContextSwitchTo(oldcontext);
3789 559 : 134 : }
560 : :
561 : : /*
562 : : * Reinitialize parallel workers for a parallel context such that we could
563 : : * launch a different number of workers. This is required for cases where
564 : : * we need to reuse the same DSM segment, but the number of workers can
565 : : * vary from run-to-run.
566 : : */
567 : : void
2246 akapila@postgresql.o 568 : 23 : ReinitializeParallelWorkers(ParallelContext *pcxt, int nworkers_to_launch)
569 : : {
570 : : /*
571 : : * The number of workers that need to be launched must be less than the
572 : : * number of workers with which the parallel context is initialized. But
573 : : * the caller might not know that InitializeParallelDSM reduced nworkers,
574 : : * so just silently trim the request.
575 : : */
492 tgl@sss.pgh.pa.us 576 : 23 : pcxt->nworkers_to_launch = Min(pcxt->nworkers, nworkers_to_launch);
2246 akapila@postgresql.o 577 : 23 : }
578 : :
579 : : /*
580 : : * Launch parallel workers.
581 : : */
582 : : void
3972 rhaas@postgresql.org 583 : 636 : LaunchParallelWorkers(ParallelContext *pcxt)
584 : : {
585 : : MemoryContext oldcontext;
586 : : BackgroundWorker worker;
587 : : int i;
3949 bruce@momjian.us 588 : 636 : bool any_registrations_failed = false;
589 : :
590 : : /* Skip this if we have no workers. */
2246 akapila@postgresql.o 591 [ + - - + ]: 636 : if (pcxt->nworkers == 0 || pcxt->nworkers_to_launch == 0)
3972 rhaas@postgresql.org 592 :UBC 0 : return;
593 : :
594 : : /* We need to be a lock group leader. */
3689 rhaas@postgresql.org 595 :CBC 636 : BecomeLockGroupLeader();
596 : :
597 : : /* If we do have workers, we'd better have a DSM segment. */
3972 598 [ - + ]: 636 : Assert(pcxt->seg != NULL);
599 : :
600 : : /* We might be running in a short-lived memory context. */
601 : 636 : oldcontext = MemoryContextSwitchTo(TopTransactionContext);
602 : :
603 : : /* Configure a worker. */
3255 tgl@sss.pgh.pa.us 604 : 636 : memset(&worker, 0, sizeof(worker));
3972 rhaas@postgresql.org 605 : 636 : snprintf(worker.bgw_name, BGW_MAXLEN, "parallel worker for PID %d",
606 : : MyProcPid);
3118 peter_e@gmx.net 607 : 636 : snprintf(worker.bgw_type, BGW_MAXLEN, "parallel worker");
3972 rhaas@postgresql.org 608 : 636 : worker.bgw_flags =
609 : : BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION
610 : : | BGWORKER_CLASS_PARALLEL;
611 : 636 : worker.bgw_start_time = BgWorkerStart_ConsistentState;
612 : 636 : worker.bgw_restart_time = BGW_NEVER_RESTART;
3271 613 : 636 : sprintf(worker.bgw_library_name, "postgres");
614 : 636 : sprintf(worker.bgw_function_name, "ParallelWorkerMain");
3972 615 : 636 : worker.bgw_main_arg = UInt32GetDatum(dsm_segment_handle(pcxt->seg));
616 : 636 : worker.bgw_notify_pid = MyProcPid;
617 : :
618 : : /*
619 : : * Start workers.
620 : : *
621 : : * The caller must be able to tolerate ending up with fewer workers than
622 : : * expected, so there is no need to throw an error here if registration
623 : : * fails. It wouldn't help much anyway, because registering the worker in
624 : : * no way guarantees that it will start up and initialize successfully.
625 : : */
2246 akapila@postgresql.o 626 [ + + ]: 2170 : for (i = 0; i < pcxt->nworkers_to_launch; ++i)
627 : : {
3783 rhaas@postgresql.org 628 : 1534 : memcpy(worker.bgw_extra, &i, sizeof(int));
3972 629 [ + + + + ]: 3039 : if (!any_registrations_failed &&
630 : 1505 : RegisterDynamicBackgroundWorker(&worker,
631 : 1505 : &pcxt->worker[i].bgwhandle))
632 : : {
633 : 1491 : shm_mq_set_handle(pcxt->worker[i].error_mqh,
634 : 1491 : pcxt->worker[i].bgwhandle);
3803 635 : 1491 : pcxt->nworkers_launched++;
636 : : }
637 : : else
638 : : {
639 : : /*
640 : : * If we weren't able to register the worker, then we've bumped up
641 : : * against the max_worker_processes limit, and future
642 : : * registrations will probably fail too, so arrange to skip them.
643 : : * But we still have to execute this code for the remaining slots
644 : : * to make sure that we forget about the error queues we budgeted
645 : : * for those workers. Otherwise, we'll wait for them to start,
646 : : * but they never will.
647 : : */
3972 648 : 43 : any_registrations_failed = true;
649 : 43 : pcxt->worker[i].bgwhandle = NULL;
3118 tgl@sss.pgh.pa.us 650 : 43 : shm_mq_detach(pcxt->worker[i].error_mqh);
3972 rhaas@postgresql.org 651 : 43 : pcxt->worker[i].error_mqh = NULL;
652 : : }
653 : : }
654 : :
655 : : /*
656 : : * Now that nworkers_launched has taken its final value, we can initialize
657 : : * known_attached_workers.
658 : : */
2973 659 [ + + ]: 636 : if (pcxt->nworkers_launched > 0)
660 : : {
95 michael@paquier.xyz 661 :GNC 626 : pcxt->known_attached_workers = palloc0_array(bool, pcxt->nworkers_launched);
2963 rhaas@postgresql.org 662 :CBC 626 : pcxt->nknown_attached_workers = 0;
663 : : }
664 : :
665 : : /* Restore previous memory context. */
3972 666 : 636 : MemoryContextSwitchTo(oldcontext);
667 : : }
668 : :
669 : : /*
670 : : * Wait for all workers to attach to their error queues, and throw an error if
671 : : * any worker fails to do this.
672 : : *
673 : : * Callers can assume that if this function returns successfully, then the
674 : : * number of workers given by pcxt->nworkers_launched have initialized and
675 : : * attached to their error queues. Whether or not these workers are guaranteed
676 : : * to still be running depends on what code the caller asked them to run;
677 : : * this function does not guarantee that they have not exited. However, it
678 : : * does guarantee that any workers which exited must have done so cleanly and
679 : : * after successfully performing the work with which they were tasked.
680 : : *
681 : : * If this function is not called, then some of the workers that were launched
682 : : * may not have been started due to a fork() failure, or may have exited during
683 : : * early startup prior to attaching to the error queue, so nworkers_launched
684 : : * cannot be viewed as completely reliable. It will never be less than the
685 : : * number of workers which actually started, but it might be more. Any workers
686 : : * that failed to start will still be discovered by
687 : : * WaitForParallelWorkersToFinish and an error will be thrown at that time,
688 : : * provided that function is eventually reached.
689 : : *
690 : : * In general, the leader process should do as much work as possible before
691 : : * calling this function. fork() failures and other early-startup failures
692 : : * are very uncommon, and having the leader sit idle when it could be doing
693 : : * useful work is undesirable. However, if the leader needs to wait for
694 : : * all of its workers or for a specific worker, it may want to call this
695 : : * function before doing so. If not, it must make some other provision for
696 : : * the failure-to-start case, lest it wait forever. On the other hand, a
697 : : * leader which never waits for a worker that might not be started yet, or
698 : : * at least never does so prior to WaitForParallelWorkersToFinish(), need not
699 : : * call this function at all.
700 : : */
701 : : void
2963 702 : 99 : WaitForParallelWorkersToAttach(ParallelContext *pcxt)
703 : : {
704 : : int i;
705 : :
706 : : /* Skip this if we have no launched workers. */
707 [ - + ]: 99 : if (pcxt->nworkers_launched == 0)
2963 rhaas@postgresql.org 708 :UBC 0 : return;
709 : :
710 : : for (;;)
711 : : {
712 : : /*
713 : : * This will process any parallel messages that are pending and it may
714 : : * also throw an error propagated from a worker.
715 : : */
2963 rhaas@postgresql.org 716 [ + + ]:CBC 2135642 : CHECK_FOR_INTERRUPTS();
717 : :
718 [ + + ]: 4345512 : for (i = 0; i < pcxt->nworkers_launched; ++i)
719 : : {
720 : : BgwHandleStatus status;
721 : : shm_mq *mq;
722 : : int rc;
723 : : pid_t pid;
724 : :
725 [ + + ]: 2209870 : if (pcxt->known_attached_workers[i])
726 : 37071 : continue;
727 : :
728 : : /*
729 : : * If error_mqh is NULL, then the worker has already exited
730 : : * cleanly.
731 : : */
732 [ - + ]: 2172799 : if (pcxt->worker[i].error_mqh == NULL)
733 : : {
2963 rhaas@postgresql.org 734 :UBC 0 : pcxt->known_attached_workers[i] = true;
735 : 0 : ++pcxt->nknown_attached_workers;
736 : 0 : continue;
737 : : }
738 : :
2963 rhaas@postgresql.org 739 :CBC 2172799 : status = GetBackgroundWorkerPid(pcxt->worker[i].bgwhandle, &pid);
740 [ + + ]: 2172799 : if (status == BGWH_STARTED)
741 : : {
742 : : /* Has the worker attached to the error queue? */
743 : 2172747 : mq = shm_mq_get_queue(pcxt->worker[i].error_mqh);
744 [ + + ]: 2172747 : if (shm_mq_get_sender(mq) != NULL)
745 : : {
746 : : /* Yes, so it is known to be attached. */
747 : 85 : pcxt->known_attached_workers[i] = true;
748 : 85 : ++pcxt->nknown_attached_workers;
749 : : }
750 : : }
751 [ - + ]: 52 : else if (status == BGWH_STOPPED)
752 : : {
753 : : /*
754 : : * If the worker stopped without attaching to the error queue,
755 : : * throw an error.
756 : : */
2963 rhaas@postgresql.org 757 :UBC 0 : mq = shm_mq_get_queue(pcxt->worker[i].error_mqh);
758 [ # # ]: 0 : if (shm_mq_get_sender(mq) == NULL)
759 [ # # ]: 0 : ereport(ERROR,
760 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
761 : : errmsg("parallel worker failed to initialize"),
762 : : errhint("More details may be available in the server log.")));
763 : :
764 : 0 : pcxt->known_attached_workers[i] = true;
765 : 0 : ++pcxt->nknown_attached_workers;
766 : : }
767 : : else
768 : : {
769 : : /*
770 : : * Worker not yet started, so we must wait. The postmaster
771 : : * will notify us if the worker's state changes. Our latch
772 : : * might also get set for some other reason, but if so we'll
773 : : * just end up waiting for the same worker again.
774 : : */
2963 rhaas@postgresql.org 775 :CBC 52 : rc = WaitLatch(MyLatch,
776 : : WL_LATCH_SET | WL_EXIT_ON_PM_DEATH,
777 : : -1, WAIT_EVENT_BGWORKER_STARTUP);
778 : :
779 [ + - ]: 52 : if (rc & WL_LATCH_SET)
780 : 52 : ResetLatch(MyLatch);
781 : : }
782 : : }
783 : :
784 : : /* If all workers are known to have started, we're done. */
785 [ + + ]: 2135642 : if (pcxt->nknown_attached_workers >= pcxt->nworkers_launched)
786 : : {
787 [ - + ]: 99 : Assert(pcxt->nknown_attached_workers == pcxt->nworkers_launched);
788 : 99 : break;
789 : : }
790 : : }
791 : : }
792 : :
793 : : /*
794 : : * Wait for all workers to finish computing.
795 : : *
796 : : * Even if the parallel operation seems to have completed successfully, it's
797 : : * important to call this function afterwards. We must not miss any errors
798 : : * the workers may have thrown during the parallel operation, or any that they
799 : : * may yet throw while shutting down.
800 : : *
801 : : * Also, we want to update our notion of XactLastRecEnd based on worker
802 : : * feedback.
803 : : */
804 : : void
3972 805 : 764 : WaitForParallelWorkersToFinish(ParallelContext *pcxt)
806 : : {
807 : : for (;;)
808 : 396 : {
3949 bruce@momjian.us 809 : 1160 : bool anyone_alive = false;
2973 rhaas@postgresql.org 810 : 1160 : int nfinished = 0;
811 : : int i;
812 : :
813 : : /*
814 : : * This will process any parallel messages that are pending, which may
815 : : * change the outcome of the loop that follows. It may also throw an
816 : : * error propagated from a worker.
817 : : */
3972 818 [ + + ]: 1160 : CHECK_FOR_INTERRUPTS();
819 : :
3663 820 [ + + ]: 3873 : for (i = 0; i < pcxt->nworkers_launched; ++i)
821 : : {
822 : : /*
823 : : * If error_mqh is NULL, then the worker has already exited
824 : : * cleanly. If we have received a message through error_mqh from
825 : : * the worker, we know it started up cleanly, and therefore we're
826 : : * certain to be notified when it exits.
827 : : */
2973 828 [ + + ]: 2755 : if (pcxt->worker[i].error_mqh == NULL)
829 : 2319 : ++nfinished;
2963 830 [ + + ]: 436 : else if (pcxt->known_attached_workers[i])
831 : : {
3972 832 : 42 : anyone_alive = true;
833 : 42 : break;
834 : : }
835 : : }
836 : :
837 [ + + ]: 1160 : if (!anyone_alive)
838 : : {
839 : : /* If all workers are known to have finished, we're done. */
2973 840 [ + + ]: 1118 : if (nfinished >= pcxt->nworkers_launched)
841 : : {
842 [ - + ]: 764 : Assert(nfinished == pcxt->nworkers_launched);
843 : 764 : break;
844 : : }
845 : :
846 : : /*
847 : : * We didn't detect any living workers, but not all workers are
848 : : * known to have exited cleanly. Either not all workers have
849 : : * launched yet, or maybe some of them failed to start or
850 : : * terminated abnormally.
851 : : */
852 [ + + ]: 1168 : for (i = 0; i < pcxt->nworkers_launched; ++i)
853 : : {
854 : : pid_t pid;
855 : : shm_mq *mq;
856 : :
857 : : /*
858 : : * If the worker is BGWH_NOT_YET_STARTED or BGWH_STARTED, we
859 : : * should just keep waiting. If it is BGWH_STOPPED, then
860 : : * further investigation is needed.
861 : : */
862 [ + + ]: 814 : if (pcxt->worker[i].error_mqh == NULL ||
863 [ + - + - ]: 788 : pcxt->worker[i].bgwhandle == NULL ||
864 : 394 : GetBackgroundWorkerPid(pcxt->worker[i].bgwhandle,
865 : : &pid) != BGWH_STOPPED)
866 : 814 : continue;
867 : :
868 : : /*
869 : : * Check whether the worker ended up stopped without ever
870 : : * attaching to the error queue. If so, the postmaster was
871 : : * unable to fork the worker or it exited without initializing
872 : : * properly. We must throw an error, since the caller may
873 : : * have been expecting the worker to do some work before
874 : : * exiting.
875 : : */
2973 rhaas@postgresql.org 876 :UBC 0 : mq = shm_mq_get_queue(pcxt->worker[i].error_mqh);
877 [ # # ]: 0 : if (shm_mq_get_sender(mq) == NULL)
878 [ # # ]: 0 : ereport(ERROR,
879 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
880 : : errmsg("parallel worker failed to initialize"),
881 : : errhint("More details may be available in the server log.")));
882 : :
883 : : /*
884 : : * The worker is stopped, but is attached to the error queue.
885 : : * Unless there's a bug somewhere, this will only happen when
886 : : * the worker writes messages and terminates after the
887 : : * CHECK_FOR_INTERRUPTS() near the top of this function and
888 : : * before the call to GetBackgroundWorkerPid(). In that case,
889 : : * our latch should have been set as well and the right things
890 : : * will happen on the next pass through the loop.
891 : : */
892 : : }
893 : : }
894 : :
2669 tmunro@postgresql.or 895 :CBC 396 : (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, -1,
896 : : WAIT_EVENT_PARALLEL_FINISH);
3204 andres@anarazel.de 897 : 396 : ResetLatch(MyLatch);
898 : : }
899 : :
3972 rhaas@postgresql.org 900 [ + - ]: 764 : if (pcxt->toc != NULL)
901 : : {
902 : : FixedParallelState *fps;
903 : :
3205 tgl@sss.pgh.pa.us 904 : 764 : fps = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_FIXED, false);
3972 rhaas@postgresql.org 905 [ + + ]: 764 : if (fps->last_xlog_end > XactLastRecEnd)
906 : 25 : XactLastRecEnd = fps->last_xlog_end;
907 : : }
908 : 764 : }
909 : :
910 : : /*
911 : : * Wait for all workers to exit.
912 : : *
913 : : * This function ensures that workers have been completely shutdown. The
914 : : * difference between WaitForParallelWorkersToFinish and this function is
915 : : * that the former just ensures that last message sent by a worker backend is
916 : : * received by the leader backend whereas this ensures the complete shutdown.
917 : : */
918 : : static void
3789 919 : 636 : WaitForParallelWorkersToExit(ParallelContext *pcxt)
920 : : {
921 : : int i;
922 : :
923 : : /* Wait until the workers actually die. */
3663 924 [ + + ]: 2127 : for (i = 0; i < pcxt->nworkers_launched; ++i)
925 : : {
926 : : BgwHandleStatus status;
927 : :
3789 928 [ + - - + ]: 1491 : if (pcxt->worker == NULL || pcxt->worker[i].bgwhandle == NULL)
3789 rhaas@postgresql.org 929 :UBC 0 : continue;
930 : :
3789 rhaas@postgresql.org 931 :CBC 1491 : status = WaitForBackgroundWorkerShutdown(pcxt->worker[i].bgwhandle);
932 : :
933 : : /*
934 : : * If the postmaster kicked the bucket, we have no chance of cleaning
935 : : * up safely -- we won't be able to tell when our workers are actually
936 : : * dead. This doesn't necessitate a PANIC since they will all abort
937 : : * eventually, but we can't safely continue this session.
938 : : */
939 [ - + ]: 1491 : if (status == BGWH_POSTMASTER_DIED)
3789 rhaas@postgresql.org 940 [ # # ]:UBC 0 : ereport(FATAL,
941 : : (errcode(ERRCODE_ADMIN_SHUTDOWN),
942 : : errmsg("postmaster exited during a parallel transaction")));
943 : :
944 : : /* Release memory. */
3789 rhaas@postgresql.org 945 :CBC 1491 : pfree(pcxt->worker[i].bgwhandle);
946 : 1491 : pcxt->worker[i].bgwhandle = NULL;
947 : : }
948 : 636 : }
949 : :
950 : : /*
951 : : * Destroy a parallel context.
952 : : *
953 : : * If expecting a clean exit, you should use WaitForParallelWorkersToFinish()
954 : : * first, before calling this function. When this function is invoked, any
955 : : * remaining workers are forcibly killed; the dynamic shared memory segment
956 : : * is unmapped; and we then wait (uninterruptibly) for the workers to exit.
957 : : */
958 : : void
3972 959 : 502 : DestroyParallelContext(ParallelContext *pcxt)
960 : : {
961 : : int i;
962 : :
963 : : /*
964 : : * Be careful about order of operations here! We remove the parallel
965 : : * context from the list before we do anything else; otherwise, if an
966 : : * error occurs during a subsequent step, we might try to nuke it again
967 : : * from AtEOXact_Parallel or AtEOSubXact_Parallel.
968 : : */
969 : 502 : dlist_delete(&pcxt->node);
970 : :
971 : : /* Kill each worker in turn, and forget their error queues. */
3819 972 [ + - ]: 502 : if (pcxt->worker != NULL)
973 : : {
3663 974 [ + + ]: 1579 : for (i = 0; i < pcxt->nworkers_launched; ++i)
975 : : {
3819 976 [ + + ]: 1077 : if (pcxt->worker[i].error_mqh != NULL)
977 : : {
3789 978 : 6 : TerminateBackgroundWorker(pcxt->worker[i].bgwhandle);
979 : :
3118 tgl@sss.pgh.pa.us 980 : 6 : shm_mq_detach(pcxt->worker[i].error_mqh);
3819 rhaas@postgresql.org 981 : 6 : pcxt->worker[i].error_mqh = NULL;
982 : : }
983 : : }
984 : : }
985 : :
986 : : /*
987 : : * If we have allocated a shared memory segment, detach it. This will
988 : : * implicitly detach the error queues, and any other shared memory queues,
989 : : * stored there.
990 : : */
3972 991 [ + - ]: 502 : if (pcxt->seg != NULL)
992 : : {
993 : 502 : dsm_detach(pcxt->seg);
994 : 502 : pcxt->seg = NULL;
995 : : }
996 : :
997 : : /*
998 : : * If this parallel context is actually in backend-private memory rather
999 : : * than shared memory, free that memory instead.
1000 : : */
3967 1001 [ - + ]: 502 : if (pcxt->private_memory != NULL)
1002 : : {
3967 rhaas@postgresql.org 1003 :UBC 0 : pfree(pcxt->private_memory);
1004 : 0 : pcxt->private_memory = NULL;
1005 : : }
1006 : :
1007 : : /*
1008 : : * We can't finish transaction commit or abort until all of the workers
1009 : : * have exited. This means, in particular, that we can't respond to
1010 : : * interrupts at this stage.
1011 : : */
3789 rhaas@postgresql.org 1012 :CBC 502 : HOLD_INTERRUPTS();
1013 : 502 : WaitForParallelWorkersToExit(pcxt);
1014 [ - + ]: 502 : RESUME_INTERRUPTS();
1015 : :
1016 : : /* Free the worker array itself. */
3972 1017 [ + - ]: 502 : if (pcxt->worker != NULL)
1018 : : {
1019 : 502 : pfree(pcxt->worker);
1020 : 502 : pcxt->worker = NULL;
1021 : : }
1022 : :
1023 : : /* Free memory. */
3257 tgl@sss.pgh.pa.us 1024 : 502 : pfree(pcxt->library_name);
1025 : 502 : pfree(pcxt->function_name);
3972 rhaas@postgresql.org 1026 : 502 : pfree(pcxt);
1027 : 502 : }
1028 : :
1029 : : /*
1030 : : * Are there any parallel contexts currently active?
1031 : : */
1032 : : bool
1033 : 334686 : ParallelContextActive(void)
1034 : : {
1035 : 334686 : return !dlist_is_empty(&pcxt_list);
1036 : : }
1037 : :
1038 : : /*
1039 : : * Handle receipt of an interrupt indicating a parallel worker message.
1040 : : *
1041 : : * Note: this is called within a signal handler! All we can do is set
1042 : : * a flag that will cause the next CHECK_FOR_INTERRUPTS() to invoke
1043 : : * ProcessParallelMessages().
1044 : : */
1045 : : void
1046 : 1364 : HandleParallelMessageInterrupt(void)
1047 : : {
1048 : 1364 : InterruptPending = true;
1049 : 1364 : ParallelMessagePending = true;
1050 : 1364 : SetLatch(MyLatch);
1051 : 1364 : }
1052 : :
1053 : : /*
1054 : : * Process any queued protocol messages received from parallel workers.
1055 : : */
1056 : : void
375 heikki.linnakangas@i 1057 : 1352 : ProcessParallelMessages(void)
1058 : : {
1059 : : dlist_iter iter;
1060 : : MemoryContext oldcontext;
1061 : :
1062 : : static MemoryContext hpm_context = NULL;
1063 : :
1064 : : /*
1065 : : * This is invoked from ProcessInterrupts(), and since some of the
1066 : : * functions it calls contain CHECK_FOR_INTERRUPTS(), there is a potential
1067 : : * for recursive calls if more signals are received while this runs. It's
1068 : : * unclear that recursive entry would be safe, and it doesn't seem useful
1069 : : * even if it is safe, so let's block interrupts until done.
1070 : : */
3512 tgl@sss.pgh.pa.us 1071 : 1352 : HOLD_INTERRUPTS();
1072 : :
1073 : : /*
1074 : : * Moreover, CurrentMemoryContext might be pointing almost anywhere. We
1075 : : * don't want to risk leaking data into long-lived contexts, so let's do
1076 : : * our work here in a private context that we can reset on each use.
1077 : : */
3488 1078 [ + + ]: 1352 : if (hpm_context == NULL) /* first time through? */
1079 : 85 : hpm_context = AllocSetContextCreate(TopMemoryContext,
1080 : : "ProcessParallelMessages",
1081 : : ALLOCSET_DEFAULT_SIZES);
1082 : : else
1083 : 1267 : MemoryContextReset(hpm_context);
1084 : :
1085 : 1352 : oldcontext = MemoryContextSwitchTo(hpm_context);
1086 : :
1087 : : /* OK to process messages. Reset the flag saying there are more to do. */
3972 rhaas@postgresql.org 1088 : 1352 : ParallelMessagePending = false;
1089 : :
1090 [ + - + + ]: 2738 : dlist_foreach(iter, &pcxt_list)
1091 : : {
1092 : : ParallelContext *pcxt;
1093 : : int i;
1094 : :
1095 : 1392 : pcxt = dlist_container(ParallelContext, node, iter.cur);
1096 [ - + ]: 1392 : if (pcxt->worker == NULL)
3972 rhaas@postgresql.org 1097 :UBC 0 : continue;
1098 : :
3663 rhaas@postgresql.org 1099 [ + + ]:CBC 5408 : for (i = 0; i < pcxt->nworkers_launched; ++i)
1100 : : {
1101 : : /*
1102 : : * Read as many messages as we can from each worker, but stop when
1103 : : * either (1) the worker's error queue goes away, which can happen
1104 : : * if we receive a Terminate message from the worker; or (2) no
1105 : : * more messages can be read from the worker without blocking.
1106 : : */
3972 1107 [ + + ]: 5507 : while (pcxt->worker[i].error_mqh != NULL)
1108 : : {
1109 : : shm_mq_result res;
1110 : : Size nbytes;
1111 : : void *data;
1112 : :
1113 : 2544 : res = shm_mq_receive(pcxt->worker[i].error_mqh, &nbytes,
1114 : : &data, true);
1115 [ + + ]: 2544 : if (res == SHM_MQ_WOULD_BLOCK)
1116 : 1053 : break;
1117 [ + - ]: 1491 : else if (res == SHM_MQ_SUCCESS)
1118 : : {
1119 : : StringInfoData msg;
1120 : :
1121 : 1491 : initStringInfo(&msg);
1122 : 1491 : appendBinaryStringInfo(&msg, data, nbytes);
375 heikki.linnakangas@i 1123 : 1491 : ProcessParallelMessage(pcxt, i, &msg);
3972 rhaas@postgresql.org 1124 : 1485 : pfree(msg.data);
1125 : : }
1126 : : else
3972 rhaas@postgresql.org 1127 [ # # ]:UBC 0 : ereport(ERROR,
1128 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1129 : : errmsg("lost connection to parallel worker")));
1130 : : }
1131 : : }
1132 : : }
1133 : :
3488 tgl@sss.pgh.pa.us 1134 :CBC 1346 : MemoryContextSwitchTo(oldcontext);
1135 : :
1136 : : /* Might as well clear the context on our way out */
1137 : 1346 : MemoryContextReset(hpm_context);
1138 : :
3512 1139 [ - + ]: 1346 : RESUME_INTERRUPTS();
3972 rhaas@postgresql.org 1140 : 1346 : }
1141 : :
1142 : : /*
1143 : : * Process a single protocol message received from a single parallel worker.
1144 : : */
1145 : : static void
375 heikki.linnakangas@i 1146 : 1491 : ProcessParallelMessage(ParallelContext *pcxt, int i, StringInfo msg)
1147 : : {
1148 : : char msgtype;
1149 : :
2963 rhaas@postgresql.org 1150 [ + - ]: 1491 : if (pcxt->known_attached_workers != NULL &&
1151 [ + + ]: 1491 : !pcxt->known_attached_workers[i])
1152 : : {
1153 : 1406 : pcxt->known_attached_workers[i] = true;
1154 : 1406 : pcxt->nknown_attached_workers++;
1155 : : }
1156 : :
3972 1157 : 1491 : msgtype = pq_getmsgbyte(msg);
1158 : :
1159 [ + - - + : 1491 : switch (msgtype)
- ]
1160 : : {
936 nathan@postgresql.or 1161 : 6 : case PqMsg_ErrorResponse:
1162 : : case PqMsg_NoticeResponse:
1163 : : {
1164 : : ErrorData edata;
1165 : : ErrorContextCallback *save_error_context_stack;
1166 : :
1167 : : /* Parse ErrorResponse or NoticeResponse. */
3972 rhaas@postgresql.org 1168 : 6 : pq_parse_errornotice(msg, &edata);
1169 : :
1170 : : /* Death of a worker isn't enough justification for suicide. */
1171 : 6 : edata.elevel = Min(edata.elevel, ERROR);
1172 : :
1173 : : /*
1174 : : * If desired, add a context line to show that this is a
1175 : : * message propagated from a parallel worker. Otherwise, it
1176 : : * can sometimes be confusing to understand what actually
1177 : : * happened. (We don't do this in DEBUG_PARALLEL_REGRESS mode
1178 : : * because it causes test-result instability depending on
1179 : : * whether a parallel worker is actually used or not.)
1180 : : */
1124 drowley@postgresql.o 1181 [ + - ]: 6 : if (debug_parallel_query != DEBUG_PARALLEL_REGRESS)
1182 : : {
3488 tgl@sss.pgh.pa.us 1183 [ + + ]: 6 : if (edata.context)
1184 : 3 : edata.context = psprintf("%s\n%s", edata.context,
1185 : : _("parallel worker"));
1186 : : else
1187 : 3 : edata.context = pstrdup(_("parallel worker"));
1188 : : }
1189 : :
1190 : : /*
1191 : : * Context beyond that should use the error context callbacks
1192 : : * that were in effect when the ParallelContext was created,
1193 : : * not the current ones.
1194 : : */
1195 : 6 : save_error_context_stack = error_context_stack;
1196 : 6 : error_context_stack = pcxt->error_context_stack;
1197 : :
1198 : : /* Rethrow error or print notice. */
3972 rhaas@postgresql.org 1199 : 6 : ThrowErrorData(&edata);
1200 : :
1201 : : /* Not an error, so restore previous context stack. */
3972 rhaas@postgresql.org 1202 :UBC 0 : error_context_stack = save_error_context_stack;
1203 : :
1204 : 0 : break;
1205 : : }
1206 : :
936 nathan@postgresql.or 1207 : 0 : case PqMsg_NotificationResponse:
1208 : : {
1209 : : /* Propagate NotifyResponse. */
1210 : : int32 pid;
1211 : : const char *channel;
1212 : : const char *payload;
1213 : :
3545 rhaas@postgresql.org 1214 : 0 : pid = pq_getmsgint(msg, 4);
1215 : 0 : channel = pq_getmsgrawstring(msg);
1216 : 0 : payload = pq_getmsgrawstring(msg);
1217 : 0 : pq_endmessage(msg);
1218 : :
1219 : 0 : NotifyMyFrontEnd(channel, payload, pid);
1220 : :
3972 1221 : 0 : break;
1222 : : }
1223 : :
394 nathan@postgresql.or 1224 : 0 : case PqMsg_Progress:
1225 : : {
1226 : : /*
1227 : : * Only incremental progress reporting is currently supported.
1228 : : * However, it's possible to add more fields to the message to
1229 : : * allow for handling of other backend progress APIs.
1230 : : */
978 msawada@postgresql.o 1231 : 0 : int index = pq_getmsgint(msg, 4);
1232 : 0 : int64 incr = pq_getmsgint64(msg);
1233 : :
1234 : 0 : pq_getmsgend(msg);
1235 : :
1236 : 0 : pgstat_progress_incr_param(index, incr);
1237 : :
1238 : 0 : break;
1239 : : }
1240 : :
936 nathan@postgresql.or 1241 :CBC 1485 : case PqMsg_Terminate:
1242 : : {
3118 tgl@sss.pgh.pa.us 1243 : 1485 : shm_mq_detach(pcxt->worker[i].error_mqh);
3972 rhaas@postgresql.org 1244 : 1485 : pcxt->worker[i].error_mqh = NULL;
1245 : 1485 : break;
1246 : : }
1247 : :
3972 rhaas@postgresql.org 1248 :UBC 0 : default:
1249 : : {
3513 tgl@sss.pgh.pa.us 1250 [ # # ]: 0 : elog(ERROR, "unrecognized message type received from parallel worker: %c (message length %d bytes)",
1251 : : msgtype, msg->len);
1252 : : }
1253 : : }
3972 rhaas@postgresql.org 1254 :CBC 1485 : }
1255 : :
1256 : : /*
1257 : : * End-of-subtransaction cleanup for parallel contexts.
1258 : : *
1259 : : * Here we remove only parallel contexts initiated within the current
1260 : : * subtransaction.
1261 : : */
1262 : : void
1263 : 11685 : AtEOSubXact_Parallel(bool isCommit, SubTransactionId mySubId)
1264 : : {
1265 [ + + ]: 11688 : while (!dlist_is_empty(&pcxt_list))
1266 : : {
1267 : : ParallelContext *pcxt;
1268 : :
1269 : 3 : pcxt = dlist_head_element(ParallelContext, node, &pcxt_list);
1270 [ - + ]: 3 : if (pcxt->subid != mySubId)
3972 rhaas@postgresql.org 1271 :UBC 0 : break;
3972 rhaas@postgresql.org 1272 [ - + ]:CBC 3 : if (isCommit)
3972 rhaas@postgresql.org 1273 [ # # ]:UBC 0 : elog(WARNING, "leaked parallel context");
3972 rhaas@postgresql.org 1274 :CBC 3 : DestroyParallelContext(pcxt);
1275 : : }
1276 : 11685 : }
1277 : :
1278 : : /*
1279 : : * End-of-transaction cleanup for parallel contexts.
1280 : : *
1281 : : * We nuke all remaining parallel contexts.
1282 : : */
1283 : : void
1284 : 337011 : AtEOXact_Parallel(bool isCommit)
1285 : : {
1286 [ + + ]: 337014 : while (!dlist_is_empty(&pcxt_list))
1287 : : {
1288 : : ParallelContext *pcxt;
1289 : :
1290 : 3 : pcxt = dlist_head_element(ParallelContext, node, &pcxt_list);
1291 [ - + ]: 3 : if (isCommit)
3972 rhaas@postgresql.org 1292 [ # # ]:UBC 0 : elog(WARNING, "leaked parallel context");
3972 rhaas@postgresql.org 1293 :CBC 3 : DestroyParallelContext(pcxt);
1294 : : }
1295 : 337011 : }
1296 : :
1297 : : /*
1298 : : * Main entrypoint for parallel workers.
1299 : : */
1300 : : void
1301 : 1491 : ParallelWorkerMain(Datum main_arg)
1302 : : {
1303 : : dsm_segment *seg;
1304 : : shm_toc *toc;
1305 : : FixedParallelState *fps;
1306 : : char *error_queue_space;
1307 : : shm_mq *mq;
1308 : : shm_mq_handle *mqh;
1309 : : char *libraryspace;
1310 : : char *entrypointstate;
1311 : : char *library_name;
1312 : : char *function_name;
1313 : : parallel_worker_main_type entrypt;
1314 : : char *gucspace;
1315 : : char *combocidspace;
1316 : : char *tsnapspace;
1317 : : char *asnapspace;
1318 : : char *tstatespace;
1319 : : char *pendingsyncsspace;
1320 : : char *reindexspace;
1321 : : char *relmapperspace;
1322 : : char *uncommittedenumsspace;
1323 : : char *clientconninfospace;
1324 : : char *session_dsm_handle_space;
1325 : : Snapshot tsnapshot;
1326 : : Snapshot asnapshot;
1327 : :
1328 : : /* Set flag to indicate that we're initializing a parallel worker. */
3803 1329 : 1491 : InitializingParallelWorker = true;
1330 : :
1331 : : /* Establish signal handlers. */
3972 1332 : 1491 : BackgroundWorkerUnblockSignals();
1333 : :
1334 : : /* Determine and set our parallel worker number. */
3783 1335 [ - + ]: 1491 : Assert(ParallelWorkerNumber == -1);
1336 : 1491 : memcpy(&ParallelWorkerNumber, MyBgworkerEntry->bgw_extra, sizeof(int));
1337 : :
1338 : : /* Set up a memory context to work in, just for cleanliness. */
3972 1339 : 1491 : CurrentMemoryContext = AllocSetContextCreate(TopMemoryContext,
1340 : : "Parallel worker",
1341 : : ALLOCSET_DEFAULT_SIZES);
1342 : :
1343 : : /*
1344 : : * Attach to the dynamic shared memory segment for the parallel query, and
1345 : : * find its table of contents.
1346 : : *
1347 : : * Note: at this point, we have not created any ResourceOwner in this
1348 : : * process. This will result in our DSM mapping surviving until process
1349 : : * exit, which is fine. If there were a ResourceOwner, it would acquire
1350 : : * ownership of the mapping, but we have no need for that.
1351 : : */
1352 : 1491 : seg = dsm_attach(DatumGetUInt32(main_arg));
1353 [ - + ]: 1491 : if (seg == NULL)
3972 rhaas@postgresql.org 1354 [ # # ]:UBC 0 : ereport(ERROR,
1355 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1356 : : errmsg("could not map dynamic shared memory segment")));
3972 rhaas@postgresql.org 1357 :CBC 1491 : toc = shm_toc_attach(PARALLEL_MAGIC, dsm_segment_address(seg));
1358 [ - + ]: 1491 : if (toc == NULL)
3972 rhaas@postgresql.org 1359 [ # # ]:UBC 0 : ereport(ERROR,
1360 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1361 : : errmsg("invalid magic number in dynamic shared memory segment")));
1362 : :
1363 : : /* Look up fixed parallel state. */
3205 tgl@sss.pgh.pa.us 1364 :CBC 1491 : fps = shm_toc_lookup(toc, PARALLEL_KEY_FIXED, false);
3972 rhaas@postgresql.org 1365 : 1491 : MyFixedParallelState = fps;
1366 : :
1367 : : /* Arrange to signal the leader if we exit. */
2100 andres@anarazel.de 1368 : 1491 : ParallelLeaderPid = fps->parallel_leader_pid;
742 heikki.linnakangas@i 1369 : 1491 : ParallelLeaderProcNumber = fps->parallel_leader_proc_number;
1810 andres@anarazel.de 1370 : 1491 : before_shmem_exit(ParallelWorkerShutdown, PointerGetDatum(seg));
1371 : :
1372 : : /*
1373 : : * Now we can find and attach to the error queue provided for us. That's
1374 : : * good, because until we do that, any errors that happen here will not be
1375 : : * reported back to the process that requested that this worker be
1376 : : * launched.
1377 : : */
3205 tgl@sss.pgh.pa.us 1378 : 1491 : error_queue_space = shm_toc_lookup(toc, PARALLEL_KEY_ERROR_QUEUE, false);
3972 rhaas@postgresql.org 1379 : 1491 : mq = (shm_mq *) (error_queue_space +
3949 bruce@momjian.us 1380 : 1491 : ParallelWorkerNumber * PARALLEL_ERROR_QUEUE_SIZE);
3972 rhaas@postgresql.org 1381 : 1491 : shm_mq_set_sender(mq, MyProc);
1382 : 1491 : mqh = shm_mq_attach(mq, seg, NULL);
3803 1383 : 1491 : pq_redirect_to_shm_mq(seg, mqh);
2100 andres@anarazel.de 1384 : 1491 : pq_set_parallel_leader(fps->parallel_leader_pid,
1385 : : fps->parallel_leader_proc_number);
1386 : :
1387 : : /*
1388 : : * Hooray! Primary initialization is complete. Now, we need to set up our
1389 : : * backend-local state to match the original backend.
1390 : : */
1391 : :
1392 : : /*
1393 : : * Join locking group. We must do this before anything that could try to
1394 : : * acquire a heavyweight lock, because any heavyweight locks acquired to
1395 : : * this point could block either directly against the parallel group
1396 : : * leader or against some process which in turn waits for a lock that
1397 : : * conflicts with the parallel group leader, causing an undetected
1398 : : * deadlock. (If we can't join the lock group, the leader has gone away,
1399 : : * so just exit quietly.)
1400 : : */
1401 [ - + ]: 1491 : if (!BecomeLockGroupMember(fps->parallel_leader_pgproc,
1402 : : fps->parallel_leader_pid))
3689 rhaas@postgresql.org 1403 :UBC 0 : return;
1404 : :
1405 : : /*
1406 : : * Restore transaction and statement start-time timestamps. This must
1407 : : * happen before anything that would start a transaction, else asserts in
1408 : : * xact.c will fire.
1409 : : */
2717 tgl@sss.pgh.pa.us 1410 :CBC 1491 : SetParallelStartTimestamps(fps->xact_ts, fps->stmt_ts);
1411 : :
1412 : : /*
1413 : : * Identify the entry point to be called. In theory this could result in
1414 : : * loading an additional library, though most likely the entry point is in
1415 : : * the core backend or in a library we just loaded.
1416 : : */
3205 1417 : 1491 : entrypointstate = shm_toc_lookup(toc, PARALLEL_KEY_ENTRYPOINT, false);
3257 1418 : 1491 : library_name = entrypointstate;
1419 : 1491 : function_name = entrypointstate + strlen(library_name) + 1;
1420 : :
1421 : 1491 : entrypt = LookupParallelWorkerFunction(library_name, function_name);
1422 : :
1423 : : /*
1424 : : * Restore current session authorization and role id. No verification
1425 : : * happens here, we just blindly adopt the leader's state. Note that this
1426 : : * has to happen before InitPostgres, since InitializeSessionUserId will
1427 : : * not set these variables.
1428 : : */
489 1429 : 1491 : SetAuthenticatedUserId(fps->authenticated_user_id);
1430 : 1491 : SetSessionAuthorization(fps->session_user_id,
1431 : 1491 : fps->session_user_is_superuser);
1432 : 1491 : SetCurrentRoleId(fps->outer_user_id, fps->role_is_superuser);
1433 : :
1434 : : /*
1435 : : * Restore database connection. We skip connection authorization checks,
1436 : : * reasoning that (a) the leader checked these things when it started, and
1437 : : * (b) we do not want parallel mode to cause these failures, because that
1438 : : * would make use of parallel query plans not transparent to applications.
1439 : : */
3972 rhaas@postgresql.org 1440 : 1491 : BackgroundWorkerInitializeConnectionByOid(fps->database_id,
1441 : : fps->authenticated_user_id,
1442 : : BGWORKER_BYPASS_ALLOWCONN |
1443 : : BGWORKER_BYPASS_ROLELOGINCHECK);
1444 : :
1445 : : /*
1446 : : * Set the client encoding to the database encoding, since that is what
1447 : : * the leader will expect. (We're cheating a bit by not calling
1448 : : * PrepareClientEncoding first. It's okay because this call will always
1449 : : * result in installing a no-op conversion. No error should be possible,
1450 : : * but check anyway.)
1451 : : */
586 tgl@sss.pgh.pa.us 1452 [ - + ]: 1491 : if (SetClientEncoding(GetDatabaseEncoding()) < 0)
586 tgl@sss.pgh.pa.us 1453 [ # # ]:UBC 0 : elog(ERROR, "SetClientEncoding(%d) failed", GetDatabaseEncoding());
1454 : :
1455 : : /*
1456 : : * Load libraries that were loaded by original backend. We want to do
1457 : : * this before restoring GUCs, because the libraries might define custom
1458 : : * variables.
1459 : : */
2733 tmunro@postgresql.or 1460 :CBC 1491 : libraryspace = shm_toc_lookup(toc, PARALLEL_KEY_LIBRARY, false);
1461 : 1491 : StartTransactionCommand();
1462 : 1491 : RestoreLibraryState(libraryspace);
3972 rhaas@postgresql.org 1463 : 1491 : CommitTransactionCommand();
1464 : :
1465 : : /* Crank up a transaction state appropriate to a parallel worker. */
3205 tgl@sss.pgh.pa.us 1466 : 1491 : tstatespace = shm_toc_lookup(toc, PARALLEL_KEY_TRANSACTION_STATE, false);
3972 rhaas@postgresql.org 1467 : 1491 : StartParallelWorkerTransaction(tstatespace);
1468 : :
1469 : : /*
1470 : : * Restore state that affects catalog access. Ideally we'd do this even
1471 : : * before calling InitPostgres, but that has order-of-initialization
1472 : : * problems, and also the relmapper would get confused during the
1473 : : * CommitTransactionCommand call above.
1474 : : */
507 noah@leadboat.com 1475 : 1491 : pendingsyncsspace = shm_toc_lookup(toc, PARALLEL_KEY_PENDING_SYNCS,
1476 : : false);
1477 : 1491 : RestorePendingSyncs(pendingsyncsspace);
542 tgl@sss.pgh.pa.us 1478 : 1491 : relmapperspace = shm_toc_lookup(toc, PARALLEL_KEY_RELMAPPER_STATE, false);
1479 : 1491 : RestoreRelationMap(relmapperspace);
1480 : 1491 : reindexspace = shm_toc_lookup(toc, PARALLEL_KEY_REINDEX_STATE, false);
1481 : 1491 : RestoreReindexState(reindexspace);
3205 1482 : 1491 : combocidspace = shm_toc_lookup(toc, PARALLEL_KEY_COMBO_CID, false);
3972 rhaas@postgresql.org 1483 : 1491 : RestoreComboCIDState(combocidspace);
1484 : :
1485 : : /* Attach to the per-session DSM segment and contained objects. */
1486 : : session_dsm_handle_space =
3104 andres@anarazel.de 1487 : 1491 : shm_toc_lookup(toc, PARALLEL_KEY_SESSION_DSM, false);
1488 : 1491 : AttachSession(*(dsm_handle *) session_dsm_handle_space);
1489 : :
1490 : : /*
1491 : : * If the transaction isolation level is REPEATABLE READ or SERIALIZABLE,
1492 : : * the leader has serialized the transaction snapshot and we must restore
1493 : : * it. At lower isolation levels, there is no transaction-lifetime
1494 : : * snapshot, but we need TransactionXmin to get set to a value which is
1495 : : * less than or equal to the xmin of every snapshot that will be used by
1496 : : * this worker. The easiest way to accomplish that is to install the
1497 : : * active snapshot as the transaction snapshot. Code running in this
1498 : : * parallel worker might take new snapshots via GetTransactionSnapshot()
1499 : : * or GetLatestSnapshot(), but it shouldn't have any way of acquiring a
1500 : : * snapshot older than the active snapshot.
1501 : : */
3205 tgl@sss.pgh.pa.us 1502 : 1491 : asnapspace = shm_toc_lookup(toc, PARALLEL_KEY_ACTIVE_SNAPSHOT, false);
1663 rhaas@postgresql.org 1503 : 1491 : tsnapspace = shm_toc_lookup(toc, PARALLEL_KEY_TRANSACTION_SNAPSHOT, true);
1504 : 1491 : asnapshot = RestoreSnapshot(asnapspace);
1505 [ + + ]: 1491 : tsnapshot = tsnapspace ? RestoreSnapshot(tsnapspace) : asnapshot;
1506 : 1491 : RestoreTransactionSnapshot(tsnapshot,
1507 : 1491 : fps->parallel_leader_pgproc);
1508 : 1491 : PushActiveSnapshot(asnapshot);
1509 : :
1510 : : /*
1511 : : * We've changed which tuples we can see, and must therefore invalidate
1512 : : * system caches.
1513 : : */
3803 1514 : 1491 : InvalidateSystemCaches();
1515 : :
1516 : : /*
1517 : : * Restore GUC values from launching backend. We can't do this earlier,
1518 : : * because GUC check hooks that do catalog lookups need to see the same
1519 : : * database state as the leader. Also, the check hooks for
1520 : : * session_authorization and role assume we already set the correct role
1521 : : * OIDs.
1522 : : */
586 tgl@sss.pgh.pa.us 1523 : 1491 : gucspace = shm_toc_lookup(toc, PARALLEL_KEY_GUC, false);
1524 : 1491 : RestoreGUCState(gucspace);
1525 : :
1526 : : /*
1527 : : * Restore current user ID and security context. No verification happens
1528 : : * here, we just blindly adopt the leader's state. We can't do this till
1529 : : * after restoring GUCs, else we'll get complaints about restoring
1530 : : * session_authorization and role. (In effect, we're assuming that all
1531 : : * the restored values are okay to set, even if we are now inside a
1532 : : * restricted context.)
1533 : : */
3972 rhaas@postgresql.org 1534 : 1491 : SetUserIdAndSecContext(fps->current_user_id, fps->sec_context);
1535 : :
1536 : : /* Restore temp-namespace state to ensure search path matches leader's. */
3566 tgl@sss.pgh.pa.us 1537 : 1491 : SetTempNamespaceState(fps->temp_namespace_id,
1538 : : fps->temp_toast_namespace_id);
1539 : :
1540 : : /* Restore uncommitted enums. */
1895 tmunro@postgresql.or 1541 : 1491 : uncommittedenumsspace = shm_toc_lookup(toc, PARALLEL_KEY_UNCOMMITTEDENUMS,
1542 : : false);
1543 : 1491 : RestoreUncommittedEnums(uncommittedenumsspace);
1544 : :
1545 : : /* Restore the ClientConnectionInfo. */
1299 michael@paquier.xyz 1546 : 1491 : clientconninfospace = shm_toc_lookup(toc, PARALLEL_KEY_CLIENTCONNINFO,
1547 : : false);
1548 : 1491 : RestoreClientConnectionInfo(clientconninfospace);
1549 : :
1550 : : /*
1551 : : * Initialize SystemUser now that MyClientConnectionInfo is restored. Also
1552 : : * ensure that auth_method is actually valid, aka authn_id is not NULL.
1553 : : */
1263 1554 [ + + ]: 1491 : if (MyClientConnectionInfo.authn_id)
1555 : 4 : InitializeSystemUser(MyClientConnectionInfo.authn_id,
1556 : : hba_authname(MyClientConnectionInfo.auth_method));
1557 : :
1558 : : /* Attach to the leader's serializable transaction, if SERIALIZABLE. */
2557 tmunro@postgresql.or 1559 : 1491 : AttachSerializableXact(fps->serializable_xact_handle);
1560 : :
1561 : : /*
1562 : : * We've initialized all of our state now; nothing should change
1563 : : * hereafter.
1564 : : */
3803 rhaas@postgresql.org 1565 : 1491 : InitializingParallelWorker = false;
3972 1566 : 1491 : EnterParallelMode();
1567 : :
1568 : : /*
1569 : : * Time to do the real work: invoke the caller-supplied code.
1570 : : */
3257 tgl@sss.pgh.pa.us 1571 : 1491 : entrypt(seg, toc);
1572 : :
1573 : : /* Must exit parallel mode to pop active snapshot. */
3972 rhaas@postgresql.org 1574 : 1485 : ExitParallelMode();
1575 : :
1576 : : /* Must pop active snapshot so snapmgr.c doesn't complain. */
1577 : 1485 : PopActiveSnapshot();
1578 : :
1579 : : /* Shut down the parallel-worker transaction. */
1580 : 1485 : EndParallelWorkerTransaction();
1581 : :
1582 : : /* Detach from the per-session DSM segment. */
3104 andres@anarazel.de 1583 : 1485 : DetachSession();
1584 : :
1585 : : /* Report success. */
936 nathan@postgresql.or 1586 : 1485 : pq_putmessage(PqMsg_Terminate, NULL, 0);
1587 : : }
1588 : :
1589 : : /*
1590 : : * Update shared memory with the ending location of the last WAL record we
1591 : : * wrote, if it's greater than the value already stored there.
1592 : : */
1593 : : void
3972 rhaas@postgresql.org 1594 : 1485 : ParallelWorkerReportLastRecEnd(XLogRecPtr last_xlog_end)
1595 : : {
1596 : 1485 : FixedParallelState *fps = MyFixedParallelState;
1597 : :
1598 [ - + ]: 1485 : Assert(fps != NULL);
1599 [ + + ]: 1485 : SpinLockAcquire(&fps->mutex);
1600 [ + + ]: 1485 : if (fps->last_xlog_end < last_xlog_end)
1601 : 118 : fps->last_xlog_end = last_xlog_end;
1602 : 1485 : SpinLockRelease(&fps->mutex);
1603 : 1485 : }
1604 : :
1605 : : /*
1606 : : * Make sure the leader tries to read from our error queue one more time.
1607 : : * This guards against the case where we exit uncleanly without sending an
1608 : : * ErrorResponse to the leader, for example because some code calls proc_exit
1609 : : * directly.
1610 : : *
1611 : : * Also explicitly detach from dsm segment so that subsystems using
1612 : : * on_dsm_detach() have a chance to send stats before the stats subsystem is
1613 : : * shut down as part of a before_shmem_exit() hook.
1614 : : *
1615 : : * One might think this could instead be solved by carefully ordering the
1616 : : * attaching to dsm segments, so that the pgstats segments get detached from
1617 : : * later than the parallel query one. That turns out to not work because the
1618 : : * stats hash might need to grow which can cause new segments to be allocated,
1619 : : * which then will be detached from earlier.
1620 : : */
1621 : : static void
2973 1622 : 1491 : ParallelWorkerShutdown(int code, Datum arg)
1623 : : {
2100 andres@anarazel.de 1624 : 1491 : SendProcSignal(ParallelLeaderPid,
1625 : : PROCSIG_PARALLEL_MESSAGE,
1626 : : ParallelLeaderProcNumber);
1627 : :
1810 1628 : 1491 : dsm_detach((dsm_segment *) DatumGetPointer(arg));
2973 rhaas@postgresql.org 1629 : 1491 : }
1630 : :
1631 : : /*
1632 : : * Look up (and possibly load) a parallel worker entry point function.
1633 : : *
1634 : : * For functions contained in the core code, we use library name "postgres"
1635 : : * and consult the InternalParallelWorkers array. External functions are
1636 : : * looked up, and loaded if necessary, using load_external_function().
1637 : : *
1638 : : * The point of this is to pass function names as strings across process
1639 : : * boundaries. We can't pass actual function addresses because of the
1640 : : * possibility that the function has been loaded at a different address
1641 : : * in a different process. This is obviously a hazard for functions in
1642 : : * loadable libraries, but it can happen even for functions in the core code
1643 : : * on platforms using EXEC_BACKEND (e.g., Windows).
1644 : : *
1645 : : * At some point it might be worthwhile to get rid of InternalParallelWorkers[]
1646 : : * in favor of applying load_external_function() for core functions too;
1647 : : * but that raises portability issues that are not worth addressing now.
1648 : : */
1649 : : static parallel_worker_main_type
3257 tgl@sss.pgh.pa.us 1650 : 1491 : LookupParallelWorkerFunction(const char *libraryname, const char *funcname)
1651 : : {
1652 : : /*
1653 : : * If the function is to be loaded from postgres itself, search the
1654 : : * InternalParallelWorkers array.
1655 : : */
1656 [ + - ]: 1491 : if (strcmp(libraryname, "postgres") == 0)
1657 : : {
1658 : : int i;
1659 : :
1660 [ + - ]: 1782 : for (i = 0; i < lengthof(InternalParallelWorkers); i++)
1661 : : {
1662 [ + + ]: 1782 : if (strcmp(InternalParallelWorkers[i].fn_name, funcname) == 0)
1663 : 1491 : return InternalParallelWorkers[i].fn_addr;
1664 : : }
1665 : :
1666 : : /* We can only reach this by programming error. */
3257 tgl@sss.pgh.pa.us 1667 [ # # ]:UBC 0 : elog(ERROR, "internal function \"%s\" not found", funcname);
1668 : : }
1669 : :
1670 : : /* Otherwise load from external library. */
1671 : 0 : return (parallel_worker_main_type)
1672 : 0 : load_external_function(libraryname, funcname, true, NULL);
1673 : : }
|