Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : * applyparallelworker.c
3 : : * Support routines for applying xact by parallel apply worker
4 : : *
5 : : * Copyright (c) 2023-2025, PostgreSQL Global Development Group
6 : : *
7 : : * IDENTIFICATION
8 : : * src/backend/replication/logical/applyparallelworker.c
9 : : *
10 : : * This file contains the code to launch, set up, and teardown a parallel apply
11 : : * worker which receives the changes from the leader worker and invokes routines
12 : : * to apply those on the subscriber database. Additionally, this file contains
13 : : * routines that are intended to support setting up, using, and tearing down a
14 : : * ParallelApplyWorkerInfo which is required so the leader worker and parallel
15 : : * apply workers can communicate with each other.
16 : : *
17 : : * The parallel apply workers are assigned (if available) as soon as xact's
18 : : * first stream is received for subscriptions that have set their 'streaming'
19 : : * option as parallel. The leader apply worker will send changes to this new
20 : : * worker via shared memory. We keep this worker assigned till the transaction
21 : : * commit is received and also wait for the worker to finish at commit. This
22 : : * preserves commit ordering and avoid file I/O in most cases, although we
23 : : * still need to spill to a file if there is no worker available. See comments
24 : : * atop logical/worker to know more about streamed xacts whose changes are
25 : : * spilled to disk. It is important to maintain commit order to avoid failures
26 : : * due to: (a) transaction dependencies - say if we insert a row in the first
27 : : * transaction and update it in the second transaction on publisher then
28 : : * allowing the subscriber to apply both in parallel can lead to failure in the
29 : : * update; (b) deadlocks - allowing transactions that update the same set of
30 : : * rows/tables in the opposite order to be applied in parallel can lead to
31 : : * deadlocks.
32 : : *
33 : : * A worker pool is used to avoid restarting workers for each streaming
34 : : * transaction. We maintain each worker's information (ParallelApplyWorkerInfo)
35 : : * in the ParallelApplyWorkerPool. After successfully launching a new worker,
36 : : * its information is added to the ParallelApplyWorkerPool. Once the worker
37 : : * finishes applying the transaction, it is marked as available for re-use.
38 : : * Now, before starting a new worker to apply the streaming transaction, we
39 : : * check the list for any available worker. Note that we retain a maximum of
40 : : * half the max_parallel_apply_workers_per_subscription workers in the pool and
41 : : * after that, we simply exit the worker after applying the transaction.
42 : : *
43 : : * XXX This worker pool threshold is arbitrary and we can provide a GUC
44 : : * variable for this in the future if required.
45 : : *
46 : : * The leader apply worker will create a separate dynamic shared memory segment
47 : : * when each parallel apply worker starts. The reason for this design is that
48 : : * we cannot predict how many workers will be needed. It may be possible to
49 : : * allocate enough shared memory in one segment based on the maximum number of
50 : : * parallel apply workers (max_parallel_apply_workers_per_subscription), but
51 : : * this would waste memory if no process is actually started.
52 : : *
53 : : * The dynamic shared memory segment contains: (a) a shm_mq that is used to
54 : : * send changes in the transaction from leader apply worker to parallel apply
55 : : * worker; (b) another shm_mq that is used to send errors (and other messages
56 : : * reported via elog/ereport) from the parallel apply worker to leader apply
57 : : * worker; (c) necessary information to be shared among parallel apply workers
58 : : * and the leader apply worker (i.e. members of ParallelApplyWorkerShared).
59 : : *
60 : : * Locking Considerations
61 : : * ----------------------
62 : : * We have a risk of deadlock due to concurrently applying the transactions in
63 : : * parallel mode that were independent on the publisher side but became
64 : : * dependent on the subscriber side due to the different database structures
65 : : * (like schema of subscription tables, constraints, etc.) on each side. This
66 : : * can happen even without parallel mode when there are concurrent operations
67 : : * on the subscriber. In order to detect the deadlocks among leader (LA) and
68 : : * parallel apply (PA) workers, we used lmgr locks when the PA waits for the
69 : : * next stream (set of changes) and LA waits for PA to finish the transaction.
70 : : * An alternative approach could be to not allow parallelism when the schema of
71 : : * tables is different between the publisher and subscriber but that would be
72 : : * too restrictive and would require the publisher to send much more
73 : : * information than it is currently sending.
74 : : *
75 : : * Consider a case where the subscribed table does not have a unique key on the
76 : : * publisher and has a unique key on the subscriber. The deadlock can happen in
77 : : * the following ways:
78 : : *
79 : : * 1) Deadlock between the leader apply worker and a parallel apply worker
80 : : *
81 : : * Consider that the parallel apply worker (PA) is executing TX-1 and the
82 : : * leader apply worker (LA) is executing TX-2 concurrently on the subscriber.
83 : : * Now, LA is waiting for PA because of the unique key constraint of the
84 : : * subscribed table while PA is waiting for LA to send the next stream of
85 : : * changes or transaction finish command message.
86 : : *
87 : : * In order for lmgr to detect this, we have LA acquire a session lock on the
88 : : * remote transaction (by pa_lock_stream()) and have PA wait on the lock before
89 : : * trying to receive the next stream of changes. Specifically, LA will acquire
90 : : * the lock in AccessExclusive mode before sending the STREAM_STOP and will
91 : : * release it if already acquired after sending the STREAM_START, STREAM_ABORT
92 : : * (for toplevel transaction), STREAM_PREPARE, and STREAM_COMMIT. The PA will
93 : : * acquire the lock in AccessShare mode after processing STREAM_STOP and
94 : : * STREAM_ABORT (for subtransaction) and then release the lock immediately
95 : : * after acquiring it.
96 : : *
97 : : * The lock graph for the above example will look as follows:
98 : : * LA (waiting to acquire the lock on the unique index) -> PA (waiting to
99 : : * acquire the stream lock) -> LA
100 : : *
101 : : * This way, when PA is waiting for LA for the next stream of changes, we can
102 : : * have a wait-edge from PA to LA in lmgr, which will make us detect the
103 : : * deadlock between LA and PA.
104 : : *
105 : : * 2) Deadlock between the leader apply worker and parallel apply workers
106 : : *
107 : : * This scenario is similar to the first case but TX-1 and TX-2 are executed by
108 : : * two parallel apply workers (PA-1 and PA-2 respectively). In this scenario,
109 : : * PA-2 is waiting for PA-1 to complete its transaction while PA-1 is waiting
110 : : * for subsequent input from LA. Also, LA is waiting for PA-2 to complete its
111 : : * transaction in order to preserve the commit order. There is a deadlock among
112 : : * the three processes.
113 : : *
114 : : * In order for lmgr to detect this, we have PA acquire a session lock (this is
115 : : * a different lock than referred in the previous case, see
116 : : * pa_lock_transaction()) on the transaction being applied and have LA wait on
117 : : * the lock before proceeding in the transaction finish commands. Specifically,
118 : : * PA will acquire this lock in AccessExclusive mode before executing the first
119 : : * message of the transaction and release it at the xact end. LA will acquire
120 : : * this lock in AccessShare mode at transaction finish commands and release it
121 : : * immediately.
122 : : *
123 : : * The lock graph for the above example will look as follows:
124 : : * LA (waiting to acquire the transaction lock) -> PA-2 (waiting to acquire the
125 : : * lock due to unique index constraint) -> PA-1 (waiting to acquire the stream
126 : : * lock) -> LA
127 : : *
128 : : * This way when LA is waiting to finish the transaction end command to preserve
129 : : * the commit order, we will be able to detect deadlock, if any.
130 : : *
131 : : * One might think we can use XactLockTableWait(), but XactLockTableWait()
132 : : * considers PREPARED TRANSACTION as still in progress which means the lock
133 : : * won't be released even after the parallel apply worker has prepared the
134 : : * transaction.
135 : : *
136 : : * 3) Deadlock when the shm_mq buffer is full
137 : : *
138 : : * In the previous scenario (ie. PA-1 and PA-2 are executing transactions
139 : : * concurrently), if the shm_mq buffer between LA and PA-2 is full, LA has to
140 : : * wait to send messages, and this wait doesn't appear in lmgr.
141 : : *
142 : : * To avoid this wait, we use a non-blocking write and wait with a timeout. If
143 : : * the timeout is exceeded, the LA will serialize all the pending messages to
144 : : * a file and indicate PA-2 that it needs to read that file for the remaining
145 : : * messages. Then LA will start waiting for commit as in the previous case
146 : : * which will detect deadlock if any. See pa_send_data() and
147 : : * enum TransApplyAction.
148 : : *
149 : : * Lock types
150 : : * ----------
151 : : * Both the stream lock and the transaction lock mentioned above are
152 : : * session-level locks because both locks could be acquired outside the
153 : : * transaction, and the stream lock in the leader needs to persist across
154 : : * transaction boundaries i.e. until the end of the streaming transaction.
155 : : *-------------------------------------------------------------------------
156 : : */
157 : :
158 : : #include "postgres.h"
159 : :
160 : : #include "libpq/pqformat.h"
161 : : #include "libpq/pqmq.h"
162 : : #include "pgstat.h"
163 : : #include "postmaster/interrupt.h"
164 : : #include "replication/logicallauncher.h"
165 : : #include "replication/logicalworker.h"
166 : : #include "replication/origin.h"
167 : : #include "replication/worker_internal.h"
168 : : #include "storage/ipc.h"
169 : : #include "storage/lmgr.h"
170 : : #include "tcop/tcopprot.h"
171 : : #include "utils/inval.h"
172 : : #include "utils/memutils.h"
173 : : #include "utils/syscache.h"
174 : :
175 : : #define PG_LOGICAL_APPLY_SHM_MAGIC 0x787ca067
176 : :
177 : : /*
178 : : * DSM keys for parallel apply worker. Unlike other parallel execution code,
179 : : * since we don't need to worry about DSM keys conflicting with plan_node_id we
180 : : * can use small integers.
181 : : */
182 : : #define PARALLEL_APPLY_KEY_SHARED 1
183 : : #define PARALLEL_APPLY_KEY_MQ 2
184 : : #define PARALLEL_APPLY_KEY_ERROR_QUEUE 3
185 : :
186 : : /* Queue size of DSM, 16 MB for now. */
187 : : #define DSM_QUEUE_SIZE (16 * 1024 * 1024)
188 : :
189 : : /*
190 : : * Error queue size of DSM. It is desirable to make it large enough that a
191 : : * typical ErrorResponse can be sent without blocking. That way, a worker that
192 : : * errors out can write the whole message into the queue and terminate without
193 : : * waiting for the user backend.
194 : : */
195 : : #define DSM_ERROR_QUEUE_SIZE (16 * 1024)
196 : :
197 : : /*
198 : : * There are three fields in each message received by the parallel apply
199 : : * worker: start_lsn, end_lsn and send_time. Because we have updated these
200 : : * statistics in the leader apply worker, we can ignore these fields in the
201 : : * parallel apply worker (see function LogicalRepApplyLoop).
202 : : */
203 : : #define SIZE_STATS_MESSAGE (2 * sizeof(XLogRecPtr) + sizeof(TimestampTz))
204 : :
205 : : /*
206 : : * The type of session-level lock on a transaction being applied on a logical
207 : : * replication subscriber.
208 : : */
209 : : #define PARALLEL_APPLY_LOCK_STREAM 0
210 : : #define PARALLEL_APPLY_LOCK_XACT 1
211 : :
212 : : /*
213 : : * Hash table entry to map xid to the parallel apply worker state.
214 : : */
215 : : typedef struct ParallelApplyWorkerEntry
216 : : {
217 : : TransactionId xid; /* Hash key -- must be first */
218 : : ParallelApplyWorkerInfo *winfo;
219 : : } ParallelApplyWorkerEntry;
220 : :
221 : : /*
222 : : * A hash table used to cache the state of streaming transactions being applied
223 : : * by the parallel apply workers.
224 : : */
225 : : static HTAB *ParallelApplyTxnHash = NULL;
226 : :
227 : : /*
228 : : * A list (pool) of active parallel apply workers. The information for
229 : : * the new worker is added to the list after successfully launching it. The
230 : : * list entry is removed if there are already enough workers in the worker
231 : : * pool at the end of the transaction. For more information about the worker
232 : : * pool, see comments atop this file.
233 : : */
234 : : static List *ParallelApplyWorkerPool = NIL;
235 : :
236 : : /*
237 : : * Information shared between leader apply worker and parallel apply worker.
238 : : */
239 : : ParallelApplyWorkerShared *MyParallelShared = NULL;
240 : :
241 : : /*
242 : : * Is there a message sent by a parallel apply worker that the leader apply
243 : : * worker needs to receive?
244 : : */
245 : : volatile sig_atomic_t ParallelApplyMessagePending = false;
246 : :
247 : : /*
248 : : * Cache the parallel apply worker information required for applying the
249 : : * current streaming transaction. It is used to save the cost of searching the
250 : : * hash table when applying the changes between STREAM_START and STREAM_STOP.
251 : : */
252 : : static ParallelApplyWorkerInfo *stream_apply_worker = NULL;
253 : :
254 : : /* A list to maintain subtransactions, if any. */
255 : : static List *subxactlist = NIL;
256 : :
257 : : static void pa_free_worker_info(ParallelApplyWorkerInfo *winfo);
258 : : static ParallelTransState pa_get_xact_state(ParallelApplyWorkerShared *wshared);
259 : : static PartialFileSetState pa_get_fileset_state(void);
260 : :
261 : : /*
262 : : * Returns true if it is OK to start a parallel apply worker, false otherwise.
263 : : */
264 : : static bool
1023 akapila@postgresql.o 265 :CBC 82 : pa_can_start(void)
266 : : {
267 : : /* Only leader apply workers can start parallel apply workers. */
268 [ + + ]: 82 : if (!am_leader_apply_worker())
269 : 27 : return false;
270 : :
271 : : /*
272 : : * It is good to check for any change in the subscription parameter to
273 : : * avoid the case where for a very long time the change doesn't get
274 : : * reflected. This can happen when there is a constant flow of streaming
275 : : * transactions that are handled by parallel apply workers.
276 : : *
277 : : * It is better to do it before the below checks so that the latest values
278 : : * of subscription can be used for the checks.
279 : : */
280 : 55 : maybe_reread_subscription();
281 : :
282 : : /*
283 : : * Don't start a new parallel apply worker if the subscription is not
284 : : * using parallel streaming mode, or if the publisher does not support
285 : : * parallel apply.
286 : : */
287 [ + + ]: 55 : if (!MyLogicalRepWorker->parallel_apply)
288 : 28 : return false;
289 : :
290 : : /*
291 : : * Don't start a new parallel worker if user has set skiplsn as it's
292 : : * possible that they want to skip the streaming transaction. For
293 : : * streaming transactions, we need to serialize the transaction to a file
294 : : * so that we can get the last LSN of the transaction to judge whether to
295 : : * skip before starting to apply the change.
296 : : *
297 : : * One might think that we could allow parallelism if the first lsn of the
298 : : * transaction is greater than skiplsn, but we don't send it with the
299 : : * STREAM START message, and it doesn't seem worth sending the extra eight
300 : : * bytes with the STREAM START to enable parallelism for this case.
301 : : */
302 [ - + ]: 27 : if (!XLogRecPtrIsInvalid(MySubscription->skiplsn))
1023 akapila@postgresql.o 303 :UBC 0 : return false;
304 : :
305 : : /*
306 : : * For streaming transactions that are being applied using a parallel
307 : : * apply worker, we cannot decide whether to apply the change for a
308 : : * relation that is not in the READY state (see
309 : : * should_apply_changes_for_rel) as we won't know remote_final_lsn by that
310 : : * time. So, we don't start the new parallel apply worker in this case.
311 : : */
1023 akapila@postgresql.o 312 [ - + ]:CBC 27 : if (!AllTablesyncsReady())
1023 akapila@postgresql.o 313 :UBC 0 : return false;
314 : :
1023 akapila@postgresql.o 315 :CBC 27 : return true;
316 : : }
317 : :
318 : : /*
319 : : * Set up a dynamic shared memory segment.
320 : : *
321 : : * We set up a control region that contains a fixed-size worker info
322 : : * (ParallelApplyWorkerShared), a message queue, and an error queue.
323 : : *
324 : : * Returns true on success, false on failure.
325 : : */
326 : : static bool
327 : 10 : pa_setup_dsm(ParallelApplyWorkerInfo *winfo)
328 : : {
329 : : shm_toc_estimator e;
330 : : Size segsize;
331 : : dsm_segment *seg;
332 : : shm_toc *toc;
333 : : ParallelApplyWorkerShared *shared;
334 : : shm_mq *mq;
335 : 10 : Size queue_size = DSM_QUEUE_SIZE;
336 : 10 : Size error_queue_size = DSM_ERROR_QUEUE_SIZE;
337 : :
338 : : /*
339 : : * Estimate how much shared memory we need.
340 : : *
341 : : * Because the TOC machinery may choose to insert padding of oddly-sized
342 : : * requests, we must estimate each chunk separately.
343 : : *
344 : : * We need one key to register the location of the header, and two other
345 : : * keys to track the locations of the message queue and the error message
346 : : * queue.
347 : : */
348 : 10 : shm_toc_initialize_estimator(&e);
349 : 10 : shm_toc_estimate_chunk(&e, sizeof(ParallelApplyWorkerShared));
350 : 10 : shm_toc_estimate_chunk(&e, queue_size);
351 : 10 : shm_toc_estimate_chunk(&e, error_queue_size);
352 : :
353 : 10 : shm_toc_estimate_keys(&e, 3);
354 : 10 : segsize = shm_toc_estimate(&e);
355 : :
356 : : /* Create the shared memory segment and establish a table of contents. */
357 : 10 : seg = dsm_create(shm_toc_estimate(&e), 0);
358 [ - + ]: 10 : if (!seg)
1023 akapila@postgresql.o 359 :UBC 0 : return false;
360 : :
1023 akapila@postgresql.o 361 :CBC 10 : toc = shm_toc_create(PG_LOGICAL_APPLY_SHM_MAGIC, dsm_segment_address(seg),
362 : : segsize);
363 : :
364 : : /* Set up the header region. */
365 : 10 : shared = shm_toc_allocate(toc, sizeof(ParallelApplyWorkerShared));
366 : 10 : SpinLockInit(&shared->mutex);
367 : :
368 : 10 : shared->xact_state = PARALLEL_TRANS_UNKNOWN;
369 : 10 : pg_atomic_init_u32(&(shared->pending_stream_count), 0);
370 : 10 : shared->last_commit_end = InvalidXLogRecPtr;
371 : 10 : shared->fileset_state = FS_EMPTY;
372 : :
373 : 10 : shm_toc_insert(toc, PARALLEL_APPLY_KEY_SHARED, shared);
374 : :
375 : : /* Set up message queue for the worker. */
376 : 10 : mq = shm_mq_create(shm_toc_allocate(toc, queue_size), queue_size);
377 : 10 : shm_toc_insert(toc, PARALLEL_APPLY_KEY_MQ, mq);
378 : 10 : shm_mq_set_sender(mq, MyProc);
379 : :
380 : : /* Attach the queue. */
381 : 10 : winfo->mq_handle = shm_mq_attach(mq, seg, NULL);
382 : :
383 : : /* Set up error queue for the worker. */
384 : 10 : mq = shm_mq_create(shm_toc_allocate(toc, error_queue_size),
385 : : error_queue_size);
386 : 10 : shm_toc_insert(toc, PARALLEL_APPLY_KEY_ERROR_QUEUE, mq);
387 : 10 : shm_mq_set_receiver(mq, MyProc);
388 : :
389 : : /* Attach the queue. */
390 : 10 : winfo->error_mq_handle = shm_mq_attach(mq, seg, NULL);
391 : :
392 : : /* Return results to caller. */
393 : 10 : winfo->dsm_seg = seg;
394 : 10 : winfo->shared = shared;
395 : :
396 : 10 : return true;
397 : : }
398 : :
399 : : /*
400 : : * Try to get a parallel apply worker from the pool. If none is available then
401 : : * start a new one.
402 : : */
403 : : static ParallelApplyWorkerInfo *
404 : 27 : pa_launch_parallel_worker(void)
405 : : {
406 : : MemoryContext oldcontext;
407 : : bool launched;
408 : : ParallelApplyWorkerInfo *winfo;
409 : : ListCell *lc;
410 : :
411 : : /* Try to get an available parallel apply worker from the worker pool. */
412 [ + + + + : 29 : foreach(lc, ParallelApplyWorkerPool)
+ + ]
413 : : {
414 : 19 : winfo = (ParallelApplyWorkerInfo *) lfirst(lc);
415 : :
416 [ + + ]: 19 : if (!winfo->in_use)
417 : 17 : return winfo;
418 : : }
419 : :
420 : : /*
421 : : * Start a new parallel apply worker.
422 : : *
423 : : * The worker info can be used for the lifetime of the worker process, so
424 : : * create it in a permanent context.
425 : : */
426 : 10 : oldcontext = MemoryContextSwitchTo(ApplyContext);
427 : :
428 : 10 : winfo = (ParallelApplyWorkerInfo *) palloc0(sizeof(ParallelApplyWorkerInfo));
429 : :
430 : : /* Setup shared memory. */
431 [ - + ]: 10 : if (!pa_setup_dsm(winfo))
432 : : {
1023 akapila@postgresql.o 433 :UBC 0 : MemoryContextSwitchTo(oldcontext);
434 : 0 : pfree(winfo);
435 : 0 : return NULL;
436 : : }
437 : :
806 akapila@postgresql.o 438 :CBC 10 : launched = logicalrep_worker_launch(WORKERTYPE_PARALLEL_APPLY,
439 : 10 : MyLogicalRepWorker->dbid,
1023 440 : 10 : MySubscription->oid,
441 : 10 : MySubscription->name,
442 : 10 : MyLogicalRepWorker->userid,
443 : : InvalidOid,
444 : : dsm_segment_handle(winfo->dsm_seg),
445 : : false);
446 : :
447 [ + - ]: 10 : if (launched)
448 : : {
449 : 10 : ParallelApplyWorkerPool = lappend(ParallelApplyWorkerPool, winfo);
450 : : }
451 : : else
452 : : {
1023 akapila@postgresql.o 453 :UBC 0 : pa_free_worker_info(winfo);
454 : 0 : winfo = NULL;
455 : : }
456 : :
1023 akapila@postgresql.o 457 :CBC 10 : MemoryContextSwitchTo(oldcontext);
458 : :
459 : 10 : return winfo;
460 : : }
461 : :
462 : : /*
463 : : * Allocate a parallel apply worker that will be used for the specified xid.
464 : : *
465 : : * We first try to get an available worker from the pool, if any and then try
466 : : * to launch a new worker. On successful allocation, remember the worker
467 : : * information in the hash table so that we can get it later for processing the
468 : : * streaming changes.
469 : : */
470 : : void
471 : 82 : pa_allocate_worker(TransactionId xid)
472 : : {
473 : : bool found;
474 : 82 : ParallelApplyWorkerInfo *winfo = NULL;
475 : : ParallelApplyWorkerEntry *entry;
476 : :
477 [ + + ]: 82 : if (!pa_can_start())
478 : 55 : return;
479 : :
1019 480 : 27 : winfo = pa_launch_parallel_worker();
481 [ - + ]: 27 : if (!winfo)
1019 akapila@postgresql.o 482 :UBC 0 : return;
483 : :
484 : : /* First time through, initialize parallel apply worker state hashtable. */
1023 akapila@postgresql.o 485 [ + + ]:CBC 27 : if (!ParallelApplyTxnHash)
486 : : {
487 : : HASHCTL ctl;
488 : :
489 [ + - + - : 91 : MemSet(&ctl, 0, sizeof(ctl));
+ - + - +
+ ]
490 : 7 : ctl.keysize = sizeof(TransactionId);
491 : 7 : ctl.entrysize = sizeof(ParallelApplyWorkerEntry);
492 : 7 : ctl.hcxt = ApplyContext;
493 : :
494 : 7 : ParallelApplyTxnHash = hash_create("logical replication parallel apply workers hash",
495 : : 16, &ctl,
496 : : HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
497 : : }
498 : :
499 : : /* Create an entry for the requested transaction. */
500 : 27 : entry = hash_search(ParallelApplyTxnHash, &xid, HASH_ENTER, &found);
501 [ - + ]: 27 : if (found)
1023 akapila@postgresql.o 502 [ # # ]:UBC 0 : elog(ERROR, "hash table corrupted");
503 : :
504 : : /* Update the transaction information in shared memory. */
1023 akapila@postgresql.o 505 [ - + ]:CBC 27 : SpinLockAcquire(&winfo->shared->mutex);
506 : 27 : winfo->shared->xact_state = PARALLEL_TRANS_UNKNOWN;
507 : 27 : winfo->shared->xid = xid;
508 : 27 : SpinLockRelease(&winfo->shared->mutex);
509 : :
510 : 27 : winfo->in_use = true;
511 : 27 : winfo->serialize_changes = false;
512 : 27 : entry->winfo = winfo;
513 : : }
514 : :
515 : : /*
516 : : * Find the assigned worker for the given transaction, if any.
517 : : */
518 : : ParallelApplyWorkerInfo *
519 : 257354 : pa_find_worker(TransactionId xid)
520 : : {
521 : : bool found;
522 : : ParallelApplyWorkerEntry *entry;
523 : :
524 [ + + ]: 257354 : if (!TransactionIdIsValid(xid))
525 : 80164 : return NULL;
526 : :
527 [ + + ]: 177190 : if (!ParallelApplyTxnHash)
528 : 103239 : return NULL;
529 : :
530 : : /* Return the cached parallel apply worker if valid. */
531 [ + + ]: 73951 : if (stream_apply_worker)
532 : 73654 : return stream_apply_worker;
533 : :
534 : : /* Find an entry for the requested transaction. */
535 : 297 : entry = hash_search(ParallelApplyTxnHash, &xid, HASH_FIND, &found);
536 [ + - ]: 297 : if (found)
537 : : {
538 : : /* The worker must not have exited. */
539 [ - + ]: 297 : Assert(entry->winfo->in_use);
540 : 297 : return entry->winfo;
541 : : }
542 : :
1023 akapila@postgresql.o 543 :UBC 0 : return NULL;
544 : : }
545 : :
546 : : /*
547 : : * Makes the worker available for reuse.
548 : : *
549 : : * This removes the parallel apply worker entry from the hash table so that it
550 : : * can't be used. If there are enough workers in the pool, it stops the worker
551 : : * and frees the corresponding info. Otherwise it just marks the worker as
552 : : * available for reuse.
553 : : *
554 : : * For more information about the worker pool, see comments atop this file.
555 : : */
556 : : static void
1023 akapila@postgresql.o 557 :CBC 24 : pa_free_worker(ParallelApplyWorkerInfo *winfo)
558 : : {
559 [ - + ]: 24 : Assert(!am_parallel_apply_worker());
560 [ - + ]: 24 : Assert(winfo->in_use);
561 [ - + ]: 24 : Assert(pa_get_xact_state(winfo->shared) == PARALLEL_TRANS_FINISHED);
562 : :
563 [ - + ]: 24 : if (!hash_search(ParallelApplyTxnHash, &winfo->shared->xid, HASH_REMOVE, NULL))
1023 akapila@postgresql.o 564 [ # # ]:UBC 0 : elog(ERROR, "hash table corrupted");
565 : :
566 : : /*
567 : : * Stop the worker if there are enough workers in the pool.
568 : : *
569 : : * XXX Additionally, we also stop the worker if the leader apply worker
570 : : * serialize part of the transaction data due to a send timeout. This is
571 : : * because the message could be partially written to the queue and there
572 : : * is no way to clean the queue other than resending the message until it
573 : : * succeeds. Instead of trying to send the data which anyway would have
574 : : * been serialized and then letting the parallel apply worker deal with
575 : : * the spurious message, we stop the worker.
576 : : */
1023 akapila@postgresql.o 577 [ + + ]:CBC 24 : if (winfo->serialize_changes ||
578 : 20 : list_length(ParallelApplyWorkerPool) >
579 [ + + ]: 20 : (max_parallel_apply_workers_per_subscription / 2))
580 : : {
903 581 : 5 : logicalrep_pa_worker_stop(winfo);
1023 582 : 5 : pa_free_worker_info(winfo);
583 : :
584 : 5 : return;
585 : : }
586 : :
587 : 19 : winfo->in_use = false;
588 : 19 : winfo->serialize_changes = false;
589 : : }
590 : :
591 : : /*
592 : : * Free the parallel apply worker information and unlink the files with
593 : : * serialized changes if any.
594 : : */
595 : : static void
596 : 5 : pa_free_worker_info(ParallelApplyWorkerInfo *winfo)
597 : : {
598 [ - + ]: 5 : Assert(winfo);
599 : :
600 [ + - ]: 5 : if (winfo->mq_handle)
601 : 5 : shm_mq_detach(winfo->mq_handle);
602 : :
603 [ - + ]: 5 : if (winfo->error_mq_handle)
1023 akapila@postgresql.o 604 :UBC 0 : shm_mq_detach(winfo->error_mq_handle);
605 : :
606 : : /* Unlink the files with serialized changes. */
1023 akapila@postgresql.o 607 [ + + ]:CBC 5 : if (winfo->serialize_changes)
608 : 4 : stream_cleanup_files(MyLogicalRepWorker->subid, winfo->shared->xid);
609 : :
610 [ + - ]: 5 : if (winfo->dsm_seg)
611 : 5 : dsm_detach(winfo->dsm_seg);
612 : :
613 : : /* Remove from the worker pool. */
614 : 5 : ParallelApplyWorkerPool = list_delete_ptr(ParallelApplyWorkerPool, winfo);
615 : :
616 : 5 : pfree(winfo);
617 : 5 : }
618 : :
619 : : /*
620 : : * Detach the error queue for all parallel apply workers.
621 : : */
622 : : void
623 : 321 : pa_detach_all_error_mq(void)
624 : : {
625 : : ListCell *lc;
626 : :
627 [ + + + + : 326 : foreach(lc, ParallelApplyWorkerPool)
+ + ]
628 : : {
629 : 5 : ParallelApplyWorkerInfo *winfo = (ParallelApplyWorkerInfo *) lfirst(lc);
630 : :
903 631 [ + - ]: 5 : if (winfo->error_mq_handle)
632 : : {
633 : 5 : shm_mq_detach(winfo->error_mq_handle);
634 : 5 : winfo->error_mq_handle = NULL;
635 : : }
636 : : }
1023 637 : 321 : }
638 : :
639 : : /*
640 : : * Check if there are any pending spooled messages.
641 : : */
642 : : static bool
643 : 16 : pa_has_spooled_message_pending()
644 : : {
645 : : PartialFileSetState fileset_state;
646 : :
647 : 16 : fileset_state = pa_get_fileset_state();
648 : :
649 : 16 : return (fileset_state != FS_EMPTY);
650 : : }
651 : :
652 : : /*
653 : : * Replay the spooled messages once the leader apply worker has finished
654 : : * serializing changes to the file.
655 : : *
656 : : * Returns false if there aren't any pending spooled messages, true otherwise.
657 : : */
658 : : static bool
659 : 49 : pa_process_spooled_messages_if_required(void)
660 : : {
661 : : PartialFileSetState fileset_state;
662 : :
663 : 49 : fileset_state = pa_get_fileset_state();
664 : :
665 [ + + ]: 49 : if (fileset_state == FS_EMPTY)
666 : 41 : return false;
667 : :
668 : : /*
669 : : * If the leader apply worker is busy serializing the partial changes then
670 : : * acquire the stream lock now and wait for the leader worker to finish
671 : : * serializing the changes. Otherwise, the parallel apply worker won't get
672 : : * a chance to receive a STREAM_STOP (and acquire the stream lock) until
673 : : * the leader had serialized all changes which can lead to undetected
674 : : * deadlock.
675 : : *
676 : : * Note that the fileset state can be FS_SERIALIZE_DONE once the leader
677 : : * worker has finished serializing the changes.
678 : : */
679 [ - + ]: 8 : if (fileset_state == FS_SERIALIZE_IN_PROGRESS)
680 : : {
1023 akapila@postgresql.o 681 :UBC 0 : pa_lock_stream(MyParallelShared->xid, AccessShareLock);
682 : 0 : pa_unlock_stream(MyParallelShared->xid, AccessShareLock);
683 : :
684 : 0 : fileset_state = pa_get_fileset_state();
685 : : }
686 : :
687 : : /*
688 : : * We cannot read the file immediately after the leader has serialized all
689 : : * changes to the file because there may still be messages in the memory
690 : : * queue. We will apply all spooled messages the next time we call this
691 : : * function and that will ensure there are no messages left in the memory
692 : : * queue.
693 : : */
1023 akapila@postgresql.o 694 [ + + ]:CBC 8 : if (fileset_state == FS_SERIALIZE_DONE)
695 : : {
696 : 4 : pa_set_fileset_state(MyParallelShared, FS_READY);
697 : : }
698 [ + - ]: 4 : else if (fileset_state == FS_READY)
699 : : {
700 : 4 : apply_spooled_messages(&MyParallelShared->fileset,
701 : 4 : MyParallelShared->xid,
702 : : InvalidXLogRecPtr);
703 : 4 : pa_set_fileset_state(MyParallelShared, FS_EMPTY);
704 : : }
705 : :
706 : 8 : return true;
707 : : }
708 : :
709 : : /*
710 : : * Interrupt handler for main loop of parallel apply worker.
711 : : */
712 : : static void
713 : 64004 : ProcessParallelApplyInterrupts(void)
714 : : {
715 [ + + ]: 64004 : CHECK_FOR_INTERRUPTS();
716 : :
717 [ + + ]: 64001 : if (ShutdownRequestPending)
718 : : {
719 [ + - ]: 5 : ereport(LOG,
720 : : (errmsg("logical replication parallel apply worker for subscription \"%s\" has finished",
721 : : MySubscription->name)));
722 : :
723 : 5 : proc_exit(0);
724 : : }
725 : :
726 [ + + ]: 63996 : if (ConfigReloadPending)
727 : : {
728 : 4 : ConfigReloadPending = false;
729 : 4 : ProcessConfigFile(PGC_SIGHUP);
730 : : }
731 : 63996 : }
732 : :
733 : : /* Parallel apply worker main loop. */
734 : : static void
735 : 10 : LogicalParallelApplyLoop(shm_mq_handle *mqh)
736 : : {
737 : : shm_mq_result shmq_res;
738 : : ErrorContextCallback errcallback;
739 : 10 : MemoryContext oldcxt = CurrentMemoryContext;
740 : :
741 : : /*
742 : : * Init the ApplyMessageContext which we clean up after each replication
743 : : * protocol message.
744 : : */
745 : 10 : ApplyMessageContext = AllocSetContextCreate(ApplyContext,
746 : : "ApplyMessageContext",
747 : : ALLOCSET_DEFAULT_SIZES);
748 : :
749 : : /*
750 : : * Push apply error context callback. Fields will be filled while applying
751 : : * a change.
752 : : */
753 : 10 : errcallback.callback = apply_error_callback;
754 : 10 : errcallback.previous = error_context_stack;
755 : 10 : error_context_stack = &errcallback;
756 : :
757 : : for (;;)
758 : 63994 : {
759 : : void *data;
760 : : Size len;
761 : :
762 : 64004 : ProcessParallelApplyInterrupts();
763 : :
764 : : /* Ensure we are reading the data into our memory context. */
765 : 63996 : MemoryContextSwitchTo(ApplyMessageContext);
766 : :
767 : 63996 : shmq_res = shm_mq_receive(mqh, &len, &data, true);
768 : :
769 [ + + ]: 63996 : if (shmq_res == SHM_MQ_SUCCESS)
770 : : {
771 : : StringInfoData s;
772 : : int c;
773 : :
774 [ - + ]: 63947 : if (len == 0)
1023 akapila@postgresql.o 775 [ # # ]:UBC 0 : elog(ERROR, "invalid message length");
776 : :
733 drowley@postgresql.o 777 :CBC 63947 : initReadOnlyStringInfo(&s, data, len);
778 : :
779 : : /*
780 : : * The first byte of messages sent from leader apply worker to
781 : : * parallel apply workers can only be PqReplMsg_WALData.
782 : : */
1023 akapila@postgresql.o 783 : 63947 : c = pq_getmsgbyte(&s);
83 nathan@postgresql.or 784 [ - + ]:GNC 63947 : if (c != PqReplMsg_WALData)
1023 akapila@postgresql.o 785 [ # # ]:UBC 0 : elog(ERROR, "unexpected message \"%c\"", c);
786 : :
787 : : /*
788 : : * Ignore statistics fields that have been updated by the leader
789 : : * apply worker.
790 : : *
791 : : * XXX We can avoid sending the statistics fields from the leader
792 : : * apply worker but for that, it needs to rebuild the entire
793 : : * message by removing these fields which could be more work than
794 : : * simply ignoring these fields in the parallel apply worker.
795 : : */
1023 akapila@postgresql.o 796 :CBC 63947 : s.cursor += SIZE_STATS_MESSAGE;
797 : :
798 : 63947 : apply_dispatch(&s);
799 : : }
800 [ + - ]: 49 : else if (shmq_res == SHM_MQ_WOULD_BLOCK)
801 : : {
802 : : /* Replay the changes from the file, if any. */
803 [ + + ]: 49 : if (!pa_process_spooled_messages_if_required())
804 : : {
805 : : int rc;
806 : :
807 : : /* Wait for more work. */
808 : 41 : rc = WaitLatch(MyLatch,
809 : : WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
810 : : 1000L,
811 : : WAIT_EVENT_LOGICAL_PARALLEL_APPLY_MAIN);
812 : :
813 [ + + ]: 41 : if (rc & WL_LATCH_SET)
814 : 38 : ResetLatch(MyLatch);
815 : : }
816 : : }
817 : : else
818 : : {
1023 akapila@postgresql.o 819 [ # # ]:UBC 0 : Assert(shmq_res == SHM_MQ_DETACHED);
820 : :
821 [ # # ]: 0 : ereport(ERROR,
822 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
823 : : errmsg("lost connection to the logical replication apply worker")));
824 : : }
825 : :
1023 akapila@postgresql.o 826 :CBC 63994 : MemoryContextReset(ApplyMessageContext);
827 : 63994 : MemoryContextSwitchTo(oldcxt);
828 : : }
829 : :
830 : : /* Pop the error context stack. */
831 : : error_context_stack = errcallback.previous;
832 : :
833 : : MemoryContextSwitchTo(oldcxt);
834 : : }
835 : :
836 : : /*
837 : : * Make sure the leader apply worker tries to read from our error queue one more
838 : : * time. This guards against the case where we exit uncleanly without sending
839 : : * an ErrorResponse, for example because some code calls proc_exit directly.
840 : : *
841 : : * Also explicitly detach from dsm segment to invoke on_dsm_detach callbacks,
842 : : * if any. See ParallelWorkerShutdown for details.
843 : : */
844 : : static void
845 : 10 : pa_shutdown(int code, Datum arg)
846 : : {
1014 847 : 10 : SendProcSignal(MyLogicalRepWorker->leader_pid,
848 : : PROCSIG_PARALLEL_APPLY_MESSAGE,
849 : : INVALID_PROC_NUMBER);
850 : :
1023 851 : 10 : dsm_detach((dsm_segment *) DatumGetPointer(arg));
852 : 10 : }
853 : :
854 : : /*
855 : : * Parallel apply worker entry point.
856 : : */
857 : : void
858 : 10 : ParallelApplyWorkerMain(Datum main_arg)
859 : : {
860 : : ParallelApplyWorkerShared *shared;
861 : : dsm_handle handle;
862 : : dsm_segment *seg;
863 : : shm_toc *toc;
864 : : shm_mq *mq;
865 : : shm_mq_handle *mqh;
866 : : shm_mq_handle *error_mqh;
867 : : RepOriginId originid;
868 : 10 : int worker_slot = DatumGetInt32(main_arg);
869 : : char originname[NAMEDATALEN];
870 : :
909 871 : 10 : InitializingApplyWorker = true;
872 : :
873 : : /*
874 : : * Setup signal handling.
875 : : *
876 : : * Note: We intentionally used SIGUSR2 to trigger a graceful shutdown
877 : : * initiated by the leader apply worker. This helps to differentiate it
878 : : * from the case where we abort the current transaction and exit on
879 : : * receiving SIGTERM.
880 : : */
1023 881 : 10 : pqsignal(SIGHUP, SignalHandlerForConfigReload);
882 : 10 : pqsignal(SIGTERM, die);
34 883 : 10 : pqsignal(SIGUSR2, SignalHandlerForShutdownRequest);
1023 884 : 10 : BackgroundWorkerUnblockSignals();
885 : :
886 : : /*
887 : : * Attach to the dynamic shared memory segment for the parallel apply, and
888 : : * find its table of contents.
889 : : *
890 : : * Like parallel query, we don't need resource owner by this time. See
891 : : * ParallelWorkerMain.
892 : : */
893 : 10 : memcpy(&handle, MyBgworkerEntry->bgw_extra, sizeof(dsm_handle));
894 : 10 : seg = dsm_attach(handle);
895 [ - + ]: 10 : if (!seg)
1023 akapila@postgresql.o 896 [ # # ]:UBC 0 : ereport(ERROR,
897 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
898 : : errmsg("could not map dynamic shared memory segment")));
899 : :
1023 akapila@postgresql.o 900 :CBC 10 : toc = shm_toc_attach(PG_LOGICAL_APPLY_SHM_MAGIC, dsm_segment_address(seg));
901 [ - + ]: 10 : if (!toc)
1023 akapila@postgresql.o 902 [ # # ]:UBC 0 : ereport(ERROR,
903 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
904 : : errmsg("invalid magic number in dynamic shared memory segment")));
905 : :
906 : : /* Look up the shared information. */
1023 akapila@postgresql.o 907 :CBC 10 : shared = shm_toc_lookup(toc, PARALLEL_APPLY_KEY_SHARED, false);
908 : 10 : MyParallelShared = shared;
909 : :
910 : : /*
911 : : * Attach to the message queue.
912 : : */
913 : 10 : mq = shm_toc_lookup(toc, PARALLEL_APPLY_KEY_MQ, false);
914 : 10 : shm_mq_set_receiver(mq, MyProc);
915 : 10 : mqh = shm_mq_attach(mq, seg, NULL);
916 : :
917 : : /*
918 : : * Primary initialization is complete. Now, we can attach to our slot.
919 : : * This is to ensure that the leader apply worker does not write data to
920 : : * the uninitialized memory queue.
921 : : */
922 : 10 : logicalrep_worker_attach(worker_slot);
923 : :
924 : : /*
925 : : * Register the shutdown callback after we are attached to the worker
926 : : * slot. This is to ensure that MyLogicalRepWorker remains valid when this
927 : : * callback is invoked.
928 : : */
903 929 : 10 : before_shmem_exit(pa_shutdown, PointerGetDatum(seg));
930 : :
1023 931 [ - + ]: 10 : SpinLockAcquire(&MyParallelShared->mutex);
932 : 10 : MyParallelShared->logicalrep_worker_generation = MyLogicalRepWorker->generation;
933 : 10 : MyParallelShared->logicalrep_worker_slot_no = worker_slot;
934 : 10 : SpinLockRelease(&MyParallelShared->mutex);
935 : :
936 : : /*
937 : : * Attach to the error queue.
938 : : */
939 : 10 : mq = shm_toc_lookup(toc, PARALLEL_APPLY_KEY_ERROR_QUEUE, false);
940 : 10 : shm_mq_set_sender(mq, MyProc);
941 : 10 : error_mqh = shm_mq_attach(mq, seg, NULL);
942 : :
943 : 10 : pq_redirect_to_shm_mq(seg, error_mqh);
1014 944 : 10 : pq_set_parallel_leader(MyLogicalRepWorker->leader_pid,
945 : : INVALID_PROC_NUMBER);
946 : :
1023 947 : 10 : MyLogicalRepWorker->last_send_time = MyLogicalRepWorker->last_recv_time =
948 : 10 : MyLogicalRepWorker->reply_time = 0;
949 : :
817 950 : 10 : InitializeLogRepWorker();
951 : :
909 952 : 10 : InitializingApplyWorker = false;
953 : :
954 : : /* Setup replication origin tracking. */
1023 955 : 10 : StartTransactionCommand();
956 : 10 : ReplicationOriginNameForLogicalRep(MySubscription->oid, InvalidOid,
957 : : originname, sizeof(originname));
958 : 10 : originid = replorigin_by_name(originname, false);
959 : :
960 : : /*
961 : : * The parallel apply worker doesn't need to monopolize this replication
962 : : * origin which was already acquired by its leader process.
963 : : */
1014 964 : 10 : replorigin_session_setup(originid, MyLogicalRepWorker->leader_pid);
1023 965 : 10 : replorigin_session_origin = originid;
966 : 10 : CommitTransactionCommand();
967 : :
968 : : /*
969 : : * Setup callback for syscache so that we know when something changes in
970 : : * the subscription relation state.
971 : : */
972 : 10 : CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP,
973 : : InvalidateSyncingRelStates,
974 : : (Datum) 0);
975 : :
976 : 10 : set_apply_error_context_origin(originname);
977 : :
978 : 10 : LogicalParallelApplyLoop(mqh);
979 : :
980 : : /*
981 : : * The parallel apply worker must not get here because the parallel apply
982 : : * worker will only stop when it receives a SIGTERM or SIGUSR2 from the
983 : : * leader, or SIGINT from itself, or when there is an error. None of these
984 : : * cases will allow the code to reach here.
985 : : */
1023 akapila@postgresql.o 986 :UBC 0 : Assert(false);
987 : : }
988 : :
989 : : /*
990 : : * Handle receipt of an interrupt indicating a parallel apply worker message.
991 : : *
992 : : * Note: this is called within a signal handler! All we can do is set a flag
993 : : * that will cause the next CHECK_FOR_INTERRUPTS() to invoke
994 : : * ProcessParallelApplyMessages().
995 : : */
996 : : void
1023 akapila@postgresql.o 997 :CBC 11 : HandleParallelApplyMessageInterrupt(void)
998 : : {
999 : 11 : InterruptPending = true;
1000 : 11 : ParallelApplyMessagePending = true;
1001 : 11 : SetLatch(MyLatch);
1002 : 11 : }
1003 : :
1004 : : /*
1005 : : * Process a single protocol message received from a single parallel apply
1006 : : * worker.
1007 : : */
1008 : : static void
237 heikki.linnakangas@i 1009 : 2 : ProcessParallelApplyMessage(StringInfo msg)
1010 : : {
1011 : : char msgtype;
1012 : :
1023 akapila@postgresql.o 1013 : 2 : msgtype = pq_getmsgbyte(msg);
1014 : :
1015 [ + - - ]: 2 : switch (msgtype)
1016 : : {
64 nathan@postgresql.or 1017 :GNC 2 : case PqMsg_ErrorResponse:
1018 : : {
1019 : : ErrorData edata;
1020 : :
1021 : : /* Parse ErrorResponse. */
1023 akapila@postgresql.o 1022 :CBC 2 : pq_parse_errornotice(msg, &edata);
1023 : :
1024 : : /*
1025 : : * If desired, add a context line to show that this is a
1026 : : * message propagated from a parallel apply worker. Otherwise,
1027 : : * it can sometimes be confusing to understand what actually
1028 : : * happened.
1029 : : */
1030 [ + - ]: 2 : if (edata.context)
1031 : 2 : edata.context = psprintf("%s\n%s", edata.context,
1032 : : _("logical replication parallel apply worker"));
1033 : : else
1023 akapila@postgresql.o 1034 :UBC 0 : edata.context = pstrdup(_("logical replication parallel apply worker"));
1035 : :
1036 : : /*
1037 : : * Context beyond that should use the error context callbacks
1038 : : * that were in effect in LogicalRepApplyLoop().
1039 : : */
1023 akapila@postgresql.o 1040 :CBC 2 : error_context_stack = apply_error_context_stack;
1041 : :
1042 : : /*
1043 : : * The actual error must have been reported by the parallel
1044 : : * apply worker.
1045 : : */
1046 [ + - ]: 2 : ereport(ERROR,
1047 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1048 : : errmsg("logical replication parallel apply worker exited due to error"),
1049 : : errcontext("%s", edata.context)));
1050 : : }
1051 : :
1052 : : /*
1053 : : * Don't need to do anything about NoticeResponse and
1054 : : * NotificationResponse as the logical replication worker doesn't
1055 : : * need to send messages to the client.
1056 : : */
64 nathan@postgresql.or 1057 :UNC 0 : case PqMsg_NoticeResponse:
1058 : : case PqMsg_NotificationResponse:
1023 akapila@postgresql.o 1059 :UBC 0 : break;
1060 : :
1061 : 0 : default:
1062 [ # # ]: 0 : elog(ERROR, "unrecognized message type received from logical replication parallel apply worker: %c (message length %d bytes)",
1063 : : msgtype, msg->len);
1064 : : }
1065 : 0 : }
1066 : :
1067 : : /*
1068 : : * Handle any queued protocol messages received from parallel apply workers.
1069 : : */
1070 : : void
237 heikki.linnakangas@i 1071 :CBC 7 : ProcessParallelApplyMessages(void)
1072 : : {
1073 : : ListCell *lc;
1074 : : MemoryContext oldcontext;
1075 : :
1076 : : static MemoryContext hpam_context = NULL;
1077 : :
1078 : : /*
1079 : : * This is invoked from ProcessInterrupts(), and since some of the
1080 : : * functions it calls contain CHECK_FOR_INTERRUPTS(), there is a potential
1081 : : * for recursive calls if more signals are received while this runs. It's
1082 : : * unclear that recursive entry would be safe, and it doesn't seem useful
1083 : : * even if it is safe, so let's block interrupts until done.
1084 : : */
1023 akapila@postgresql.o 1085 : 7 : HOLD_INTERRUPTS();
1086 : :
1087 : : /*
1088 : : * Moreover, CurrentMemoryContext might be pointing almost anywhere. We
1089 : : * don't want to risk leaking data into long-lived contexts, so let's do
1090 : : * our work here in a private context that we can reset on each use.
1091 : : */
1092 [ + + ]: 7 : if (!hpam_context) /* first time through? */
1093 : 6 : hpam_context = AllocSetContextCreate(TopMemoryContext,
1094 : : "ProcessParallelApplyMessages",
1095 : : ALLOCSET_DEFAULT_SIZES);
1096 : : else
1097 : 1 : MemoryContextReset(hpam_context);
1098 : :
1099 : 7 : oldcontext = MemoryContextSwitchTo(hpam_context);
1100 : :
1101 : 7 : ParallelApplyMessagePending = false;
1102 : :
1103 [ + - + + : 13 : foreach(lc, ParallelApplyWorkerPool)
+ + ]
1104 : : {
1105 : : shm_mq_result res;
1106 : : Size nbytes;
1107 : : void *data;
1108 : 8 : ParallelApplyWorkerInfo *winfo = (ParallelApplyWorkerInfo *) lfirst(lc);
1109 : :
1110 : : /*
1111 : : * The leader will detach from the error queue and set it to NULL
1112 : : * before preparing to stop all parallel apply workers, so we don't
1113 : : * need to handle error messages anymore. See
1114 : : * logicalrep_worker_detach.
1115 : : */
1116 [ + + ]: 8 : if (!winfo->error_mq_handle)
1117 : 6 : continue;
1118 : :
1119 : 3 : res = shm_mq_receive(winfo->error_mq_handle, &nbytes, &data, true);
1120 : :
1121 [ + + ]: 3 : if (res == SHM_MQ_WOULD_BLOCK)
1122 : 1 : continue;
1123 [ + - ]: 2 : else if (res == SHM_MQ_SUCCESS)
1124 : : {
1125 : : StringInfoData msg;
1126 : :
1127 : 2 : initStringInfo(&msg);
1128 : 2 : appendBinaryStringInfo(&msg, data, nbytes);
237 heikki.linnakangas@i 1129 : 2 : ProcessParallelApplyMessage(&msg);
1023 akapila@postgresql.o 1130 :UBC 0 : pfree(msg.data);
1131 : : }
1132 : : else
1133 [ # # ]: 0 : ereport(ERROR,
1134 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1135 : : errmsg("lost connection to the logical replication parallel apply worker")));
1136 : : }
1137 : :
1023 akapila@postgresql.o 1138 :CBC 5 : MemoryContextSwitchTo(oldcontext);
1139 : :
1140 : : /* Might as well clear the context on our way out */
1141 : 5 : MemoryContextReset(hpam_context);
1142 : :
1143 [ - + ]: 5 : RESUME_INTERRUPTS();
1144 : 5 : }
1145 : :
1146 : : /*
1147 : : * Send the data to the specified parallel apply worker via shared-memory
1148 : : * queue.
1149 : : *
1150 : : * Returns false if the attempt to send data via shared memory times out, true
1151 : : * otherwise.
1152 : : */
1153 : : bool
1154 : 68914 : pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data)
1155 : : {
1156 : : int rc;
1157 : : shm_mq_result result;
1158 : 68914 : TimestampTz startTime = 0;
1159 : :
1160 [ - + ]: 68914 : Assert(!IsTransactionState());
1161 [ - + ]: 68914 : Assert(!winfo->serialize_changes);
1162 : :
1163 : : /*
1164 : : * We don't try to send data to parallel worker for 'immediate' mode. This
1165 : : * is primarily used for testing purposes.
1166 : : */
791 peter@eisentraut.org 1167 [ + + ]: 68914 : if (unlikely(debug_logical_replication_streaming == DEBUG_LOGICAL_REP_STREAMING_IMMEDIATE))
999 akapila@postgresql.o 1168 : 4 : return false;
1169 : :
1170 : : /*
1171 : : * This timeout is a bit arbitrary but testing revealed that it is sufficient
1172 : : * to send the message unless the parallel apply worker is waiting on some
1173 : : * lock or there is a serious resource crunch. See the comments atop this file
1174 : : * to know why we are using a non-blocking way to send the message.
1175 : : */
1176 : : #define SHM_SEND_RETRY_INTERVAL_MS 1000
1177 : : #define SHM_SEND_TIMEOUT_MS (10000 - SHM_SEND_RETRY_INTERVAL_MS)
1178 : :
1179 : : for (;;)
1180 : : {
1023 1181 : 68910 : result = shm_mq_send(winfo->mq_handle, nbytes, data, true, true);
1182 : :
1183 [ + - ]: 68910 : if (result == SHM_MQ_SUCCESS)
1184 : 68910 : return true;
1023 akapila@postgresql.o 1185 [ # # ]:UBC 0 : else if (result == SHM_MQ_DETACHED)
1186 [ # # ]: 0 : ereport(ERROR,
1187 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1188 : : errmsg("could not send data to shared-memory queue")));
1189 : :
1190 [ # # ]: 0 : Assert(result == SHM_MQ_WOULD_BLOCK);
1191 : :
1192 : : /* Wait before retrying. */
1193 : 0 : rc = WaitLatch(MyLatch,
1194 : : WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
1195 : : SHM_SEND_RETRY_INTERVAL_MS,
1196 : : WAIT_EVENT_LOGICAL_APPLY_SEND_DATA);
1197 : :
1198 [ # # ]: 0 : if (rc & WL_LATCH_SET)
1199 : : {
1200 : 0 : ResetLatch(MyLatch);
1201 [ # # ]: 0 : CHECK_FOR_INTERRUPTS();
1202 : : }
1203 : :
1204 [ # # ]: 0 : if (startTime == 0)
1205 : 0 : startTime = GetCurrentTimestamp();
1206 [ # # ]: 0 : else if (TimestampDifferenceExceeds(startTime, GetCurrentTimestamp(),
1207 : : SHM_SEND_TIMEOUT_MS))
1208 : 0 : return false;
1209 : : }
1210 : : }
1211 : :
1212 : : /*
1213 : : * Switch to PARTIAL_SERIALIZE mode for the current transaction -- this means
1214 : : * that the current data and any subsequent data for this transaction will be
1215 : : * serialized to a file. This is done to prevent possible deadlocks with
1216 : : * another parallel apply worker (refer to the comments atop this file).
1217 : : */
1218 : : void
1023 akapila@postgresql.o 1219 :CBC 4 : pa_switch_to_partial_serialize(ParallelApplyWorkerInfo *winfo,
1220 : : bool stream_locked)
1221 : : {
999 1222 [ + - ]: 4 : ereport(LOG,
1223 : : (errmsg("logical replication apply worker will serialize the remaining changes of remote transaction %u to a file",
1224 : : winfo->shared->xid)));
1225 : :
1226 : : /*
1227 : : * The parallel apply worker could be stuck for some reason (say waiting
1228 : : * on some lock by other backend), so stop trying to send data directly to
1229 : : * it and start serializing data to the file instead.
1230 : : */
1023 1231 : 4 : winfo->serialize_changes = true;
1232 : :
1233 : : /* Initialize the stream fileset. */
1234 : 4 : stream_start_internal(winfo->shared->xid, true);
1235 : :
1236 : : /*
1237 : : * Acquires the stream lock if not already to make sure that the parallel
1238 : : * apply worker will wait for the leader to release the stream lock until
1239 : : * the end of the transaction.
1240 : : */
1241 [ + - ]: 4 : if (!stream_locked)
1242 : 4 : pa_lock_stream(winfo->shared->xid, AccessExclusiveLock);
1243 : :
1244 : 4 : pa_set_fileset_state(winfo->shared, FS_SERIALIZE_IN_PROGRESS);
1245 : 4 : }
1246 : :
1247 : : /*
1248 : : * Wait until the parallel apply worker's transaction state has reached or
1249 : : * exceeded the given xact_state.
1250 : : */
1251 : : static void
1252 : 25 : pa_wait_for_xact_state(ParallelApplyWorkerInfo *winfo,
1253 : : ParallelTransState xact_state)
1254 : : {
1255 : : for (;;)
1256 : : {
1257 : : /*
1258 : : * Stop if the transaction state has reached or exceeded the given
1259 : : * xact_state.
1260 : : */
1261 [ + + ]: 273 : if (pa_get_xact_state(winfo->shared) >= xact_state)
1262 : 25 : break;
1263 : :
1264 : : /* Wait to be signalled. */
1265 : 248 : (void) WaitLatch(MyLatch,
1266 : : WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
1267 : : 10L,
1268 : : WAIT_EVENT_LOGICAL_PARALLEL_APPLY_STATE_CHANGE);
1269 : :
1270 : : /* Reset the latch so we don't spin. */
1271 : 248 : ResetLatch(MyLatch);
1272 : :
1273 : : /* An interrupt may have occurred while we were waiting. */
1274 [ - + ]: 248 : CHECK_FOR_INTERRUPTS();
1275 : : }
1276 : 25 : }
1277 : :
1278 : : /*
1279 : : * Wait until the parallel apply worker's transaction finishes.
1280 : : */
1281 : : static void
1282 : 25 : pa_wait_for_xact_finish(ParallelApplyWorkerInfo *winfo)
1283 : : {
1284 : : /*
1285 : : * Wait until the parallel apply worker set the state to
1286 : : * PARALLEL_TRANS_STARTED which means it has acquired the transaction
1287 : : * lock. This is to prevent leader apply worker from acquiring the
1288 : : * transaction lock earlier than the parallel apply worker.
1289 : : */
1290 : 25 : pa_wait_for_xact_state(winfo, PARALLEL_TRANS_STARTED);
1291 : :
1292 : : /*
1293 : : * Wait for the transaction lock to be released. This is required to
1294 : : * detect deadlock among leader and parallel apply workers. Refer to the
1295 : : * comments atop this file.
1296 : : */
1297 : 25 : pa_lock_transaction(winfo->shared->xid, AccessShareLock);
1298 : 24 : pa_unlock_transaction(winfo->shared->xid, AccessShareLock);
1299 : :
1300 : : /*
1301 : : * Check if the state becomes PARALLEL_TRANS_FINISHED in case the parallel
1302 : : * apply worker failed while applying changes causing the lock to be
1303 : : * released.
1304 : : */
1305 [ - + ]: 24 : if (pa_get_xact_state(winfo->shared) != PARALLEL_TRANS_FINISHED)
1023 akapila@postgresql.o 1306 [ # # ]:UBC 0 : ereport(ERROR,
1307 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1308 : : errmsg("lost connection to the logical replication parallel apply worker")));
1023 akapila@postgresql.o 1309 :CBC 24 : }
1310 : :
1311 : : /*
1312 : : * Set the transaction state for a given parallel apply worker.
1313 : : */
1314 : : void
1315 : 51 : pa_set_xact_state(ParallelApplyWorkerShared *wshared,
1316 : : ParallelTransState xact_state)
1317 : : {
1318 [ - + ]: 51 : SpinLockAcquire(&wshared->mutex);
1319 : 51 : wshared->xact_state = xact_state;
1320 : 51 : SpinLockRelease(&wshared->mutex);
1321 : 51 : }
1322 : :
1323 : : /*
1324 : : * Get the transaction state for a given parallel apply worker.
1325 : : */
1326 : : static ParallelTransState
1327 : 321 : pa_get_xact_state(ParallelApplyWorkerShared *wshared)
1328 : : {
1329 : : ParallelTransState xact_state;
1330 : :
1331 [ - + ]: 321 : SpinLockAcquire(&wshared->mutex);
1332 : 321 : xact_state = wshared->xact_state;
1333 : 321 : SpinLockRelease(&wshared->mutex);
1334 : :
1335 : 321 : return xact_state;
1336 : : }
1337 : :
1338 : : /*
1339 : : * Cache the parallel apply worker information.
1340 : : */
1341 : : void
1342 : 524 : pa_set_stream_apply_worker(ParallelApplyWorkerInfo *winfo)
1343 : : {
1344 : 524 : stream_apply_worker = winfo;
1345 : 524 : }
1346 : :
1347 : : /*
1348 : : * Form a unique savepoint name for the streaming transaction.
1349 : : *
1350 : : * Note that different subscriptions for publications on different nodes can
1351 : : * receive same remote xid, so we need to use subscription id along with it.
1352 : : *
1353 : : * Returns the name in the supplied buffer.
1354 : : */
1355 : : static void
1356 : 27 : pa_savepoint_name(Oid suboid, TransactionId xid, char *spname, Size szsp)
1357 : : {
1358 : 27 : snprintf(spname, szsp, "pg_sp_%u_%u", suboid, xid);
1359 : 27 : }
1360 : :
1361 : : /*
1362 : : * Define a savepoint for a subxact in parallel apply worker if needed.
1363 : : *
1364 : : * The parallel apply worker can figure out if a new subtransaction was
1365 : : * started by checking if the new change arrived with a different xid. In that
1366 : : * case define a named savepoint, so that we are able to rollback to it
1367 : : * if required.
1368 : : */
1369 : : void
1370 : 68451 : pa_start_subtrans(TransactionId current_xid, TransactionId top_xid)
1371 : : {
1372 [ + + ]: 68451 : if (current_xid != top_xid &&
1373 [ + + ]: 52 : !list_member_xid(subxactlist, current_xid))
1374 : : {
1375 : : MemoryContext oldctx;
1376 : : char spname[NAMEDATALEN];
1377 : :
1378 : 17 : pa_savepoint_name(MySubscription->oid, current_xid,
1379 : : spname, sizeof(spname));
1380 : :
1381 [ + + ]: 17 : elog(DEBUG1, "defining savepoint %s in logical replication parallel apply worker", spname);
1382 : :
1383 : : /* We must be in transaction block to define the SAVEPOINT. */
1384 [ + + ]: 17 : if (!IsTransactionBlock())
1385 : : {
1386 [ - + ]: 5 : if (!IsTransactionState())
1023 akapila@postgresql.o 1387 :UBC 0 : StartTransactionCommand();
1388 : :
1023 akapila@postgresql.o 1389 :CBC 5 : BeginTransactionBlock();
1390 : 5 : CommitTransactionCommand();
1391 : : }
1392 : :
1393 : 17 : DefineSavepoint(spname);
1394 : :
1395 : : /*
1396 : : * CommitTransactionCommand is needed to start a subtransaction after
1397 : : * issuing a SAVEPOINT inside a transaction block (see
1398 : : * StartSubTransaction()).
1399 : : */
1400 : 17 : CommitTransactionCommand();
1401 : :
1402 : 17 : oldctx = MemoryContextSwitchTo(TopTransactionContext);
1403 : 17 : subxactlist = lappend_xid(subxactlist, current_xid);
1404 : 17 : MemoryContextSwitchTo(oldctx);
1405 : : }
1406 : 68451 : }
1407 : :
1408 : : /* Reset the list that maintains subtransactions. */
1409 : : void
1410 : 24 : pa_reset_subtrans(void)
1411 : : {
1412 : : /*
1413 : : * We don't need to free this explicitly as the allocated memory will be
1414 : : * freed at the transaction end.
1415 : : */
1416 : 24 : subxactlist = NIL;
1417 : 24 : }
1418 : :
1419 : : /*
1420 : : * Handle STREAM ABORT message when the transaction was applied in a parallel
1421 : : * apply worker.
1422 : : */
1423 : : void
1424 : 12 : pa_stream_abort(LogicalRepStreamAbortData *abort_data)
1425 : : {
1426 : 12 : TransactionId xid = abort_data->xid;
1427 : 12 : TransactionId subxid = abort_data->subxid;
1428 : :
1429 : : /*
1430 : : * Update origin state so we can restart streaming from correct position
1431 : : * in case of crash.
1432 : : */
1433 : 12 : replorigin_session_origin_lsn = abort_data->abort_lsn;
1434 : 12 : replorigin_session_origin_timestamp = abort_data->abort_time;
1435 : :
1436 : : /*
1437 : : * If the two XIDs are the same, it's in fact abort of toplevel xact, so
1438 : : * just free the subxactlist.
1439 : : */
1440 [ + + ]: 12 : if (subxid == xid)
1441 : : {
1442 : 2 : pa_set_xact_state(MyParallelShared, PARALLEL_TRANS_FINISHED);
1443 : :
1444 : : /*
1445 : : * Release the lock as we might be processing an empty streaming
1446 : : * transaction in which case the lock won't be released during
1447 : : * transaction rollback.
1448 : : *
1449 : : * Note that it's ok to release the transaction lock before aborting
1450 : : * the transaction because even if the parallel apply worker dies due
1451 : : * to crash or some other reason, such a transaction would still be
1452 : : * considered aborted.
1453 : : */
1454 : 2 : pa_unlock_transaction(xid, AccessExclusiveLock);
1455 : :
1456 : 2 : AbortCurrentTransaction();
1457 : :
1458 [ + + ]: 2 : if (IsTransactionBlock())
1459 : : {
1460 : 1 : EndTransactionBlock(false);
1461 : 1 : CommitTransactionCommand();
1462 : : }
1463 : :
1464 : 2 : pa_reset_subtrans();
1465 : :
1466 : 2 : pgstat_report_activity(STATE_IDLE, NULL);
1467 : : }
1468 : : else
1469 : : {
1470 : : /* OK, so it's a subxact. Rollback to the savepoint. */
1471 : : int i;
1472 : : char spname[NAMEDATALEN];
1473 : :
1474 : 10 : pa_savepoint_name(MySubscription->oid, subxid, spname, sizeof(spname));
1475 : :
1476 [ + + ]: 10 : elog(DEBUG1, "rolling back to savepoint %s in logical replication parallel apply worker", spname);
1477 : :
1478 : : /*
1479 : : * Search the subxactlist, determine the offset tracked for the
1480 : : * subxact, and truncate the list.
1481 : : *
1482 : : * Note that for an empty sub-transaction we won't find the subxid
1483 : : * here.
1484 : : */
1485 [ + + ]: 12 : for (i = list_length(subxactlist) - 1; i >= 0; i--)
1486 : : {
1487 : 11 : TransactionId xid_tmp = lfirst_xid(list_nth_cell(subxactlist, i));
1488 : :
1489 [ + + ]: 11 : if (xid_tmp == subxid)
1490 : : {
1491 : 9 : RollbackToSavepoint(spname);
1492 : 9 : CommitTransactionCommand();
1493 : 9 : subxactlist = list_truncate(subxactlist, i);
1494 : 9 : break;
1495 : : }
1496 : : }
1497 : : }
1498 : 12 : }
1499 : :
1500 : : /*
1501 : : * Set the fileset state for a particular parallel apply worker. The fileset
1502 : : * will be set once the leader worker serialized all changes to the file
1503 : : * so that it can be used by parallel apply worker.
1504 : : */
1505 : : void
1506 : 16 : pa_set_fileset_state(ParallelApplyWorkerShared *wshared,
1507 : : PartialFileSetState fileset_state)
1508 : : {
1509 [ - + ]: 16 : SpinLockAcquire(&wshared->mutex);
1510 : 16 : wshared->fileset_state = fileset_state;
1511 : :
1512 [ + + ]: 16 : if (fileset_state == FS_SERIALIZE_DONE)
1513 : : {
1514 [ - + ]: 4 : Assert(am_leader_apply_worker());
1515 [ - + ]: 4 : Assert(MyLogicalRepWorker->stream_fileset);
1516 : 4 : wshared->fileset = *MyLogicalRepWorker->stream_fileset;
1517 : : }
1518 : :
1519 : 16 : SpinLockRelease(&wshared->mutex);
1520 : 16 : }
1521 : :
1522 : : /*
1523 : : * Get the fileset state for the current parallel apply worker.
1524 : : */
1525 : : static PartialFileSetState
1526 : 65 : pa_get_fileset_state(void)
1527 : : {
1528 : : PartialFileSetState fileset_state;
1529 : :
1530 [ - + ]: 65 : Assert(am_parallel_apply_worker());
1531 : :
1532 [ - + ]: 65 : SpinLockAcquire(&MyParallelShared->mutex);
1533 : 65 : fileset_state = MyParallelShared->fileset_state;
1534 : 65 : SpinLockRelease(&MyParallelShared->mutex);
1535 : :
1536 : 65 : return fileset_state;
1537 : : }
1538 : :
1539 : : /*
1540 : : * Helper functions to acquire and release a lock for each stream block.
1541 : : *
1542 : : * Set locktag_field4 to PARALLEL_APPLY_LOCK_STREAM to indicate that it's a
1543 : : * stream lock.
1544 : : *
1545 : : * Refer to the comments atop this file to see how the stream lock is used.
1546 : : */
1547 : : void
1548 : 284 : pa_lock_stream(TransactionId xid, LOCKMODE lockmode)
1549 : : {
1550 : 284 : LockApplyTransactionForSession(MyLogicalRepWorker->subid, xid,
1551 : : PARALLEL_APPLY_LOCK_STREAM, lockmode);
1552 : 282 : }
1553 : :
1554 : : void
1555 : 280 : pa_unlock_stream(TransactionId xid, LOCKMODE lockmode)
1556 : : {
1557 : 280 : UnlockApplyTransactionForSession(MyLogicalRepWorker->subid, xid,
1558 : : PARALLEL_APPLY_LOCK_STREAM, lockmode);
1559 : 280 : }
1560 : :
1561 : : /*
1562 : : * Helper functions to acquire and release a lock for each local transaction
1563 : : * apply.
1564 : : *
1565 : : * Set locktag_field4 to PARALLEL_APPLY_LOCK_XACT to indicate that it's a
1566 : : * transaction lock.
1567 : : *
1568 : : * Note that all the callers must pass a remote transaction ID instead of a
1569 : : * local transaction ID as xid. This is because the local transaction ID will
1570 : : * only be assigned while applying the first change in the parallel apply but
1571 : : * it's possible that the first change in the parallel apply worker is blocked
1572 : : * by a concurrently executing transaction in another parallel apply worker. We
1573 : : * can only communicate the local transaction id to the leader after applying
1574 : : * the first change so it won't be able to wait after sending the xact finish
1575 : : * command using this lock.
1576 : : *
1577 : : * Refer to the comments atop this file to see how the transaction lock is
1578 : : * used.
1579 : : */
1580 : : void
1581 : 52 : pa_lock_transaction(TransactionId xid, LOCKMODE lockmode)
1582 : : {
1583 : 52 : LockApplyTransactionForSession(MyLogicalRepWorker->subid, xid,
1584 : : PARALLEL_APPLY_LOCK_XACT, lockmode);
1585 : 51 : }
1586 : :
1587 : : void
1588 : 48 : pa_unlock_transaction(TransactionId xid, LOCKMODE lockmode)
1589 : : {
1590 : 48 : UnlockApplyTransactionForSession(MyLogicalRepWorker->subid, xid,
1591 : : PARALLEL_APPLY_LOCK_XACT, lockmode);
1592 : 48 : }
1593 : :
1594 : : /*
1595 : : * Decrement the number of pending streaming blocks and wait on the stream lock
1596 : : * if there is no pending block available.
1597 : : */
1598 : : void
1599 : 261 : pa_decr_and_wait_stream_block(void)
1600 : : {
1601 [ - + ]: 261 : Assert(am_parallel_apply_worker());
1602 : :
1603 : : /*
1604 : : * It is only possible to not have any pending stream chunks when we are
1605 : : * applying spooled messages.
1606 : : */
1607 [ + + ]: 261 : if (pg_atomic_read_u32(&MyParallelShared->pending_stream_count) == 0)
1608 : : {
1609 [ + - ]: 16 : if (pa_has_spooled_message_pending())
1610 : 16 : return;
1611 : :
1023 akapila@postgresql.o 1612 [ # # ]:UBC 0 : elog(ERROR, "invalid pending streaming chunk 0");
1613 : : }
1614 : :
1023 akapila@postgresql.o 1615 [ + + ]:CBC 245 : if (pg_atomic_sub_fetch_u32(&MyParallelShared->pending_stream_count, 1) == 0)
1616 : : {
1617 : 24 : pa_lock_stream(MyParallelShared->xid, AccessShareLock);
1618 : 22 : pa_unlock_stream(MyParallelShared->xid, AccessShareLock);
1619 : : }
1620 : : }
1621 : :
1622 : : /*
1623 : : * Finish processing the streaming transaction in the leader apply worker.
1624 : : */
1625 : : void
1626 : 25 : pa_xact_finish(ParallelApplyWorkerInfo *winfo, XLogRecPtr remote_lsn)
1627 : : {
1628 [ - + ]: 25 : Assert(am_leader_apply_worker());
1629 : :
1630 : : /*
1631 : : * Unlock the shared object lock so that parallel apply worker can
1632 : : * continue to receive and apply changes.
1633 : : */
1634 : 25 : pa_unlock_stream(winfo->shared->xid, AccessExclusiveLock);
1635 : :
1636 : : /*
1637 : : * Wait for that worker to finish. This is necessary to maintain commit
1638 : : * order which avoids failures due to transaction dependencies and
1639 : : * deadlocks.
1640 : : */
1641 : 25 : pa_wait_for_xact_finish(winfo);
1642 : :
1643 [ + + ]: 24 : if (!XLogRecPtrIsInvalid(remote_lsn))
1644 : 22 : store_flush_position(remote_lsn, winfo->shared->last_commit_end);
1645 : :
1646 : 24 : pa_free_worker(winfo);
1647 : 24 : }
|