Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : * launcher.c
3 : : * PostgreSQL logical replication worker launcher process
4 : : *
5 : : * Copyright (c) 2016-2026, PostgreSQL Global Development Group
6 : : *
7 : : * IDENTIFICATION
8 : : * src/backend/replication/logical/launcher.c
9 : : *
10 : : * NOTES
11 : : * This module contains the logical replication worker launcher which
12 : : * uses the background worker infrastructure to start the logical
13 : : * replication workers for every enabled subscription.
14 : : *
15 : : *-------------------------------------------------------------------------
16 : : */
17 : :
18 : : #include "postgres.h"
19 : :
20 : : #include "access/heapam.h"
21 : : #include "access/htup.h"
22 : : #include "access/htup_details.h"
23 : : #include "access/tableam.h"
24 : : #include "access/xact.h"
25 : : #include "catalog/pg_subscription.h"
26 : : #include "catalog/pg_subscription_rel.h"
27 : : #include "funcapi.h"
28 : : #include "lib/dshash.h"
29 : : #include "miscadmin.h"
30 : : #include "pgstat.h"
31 : : #include "postmaster/bgworker.h"
32 : : #include "postmaster/interrupt.h"
33 : : #include "replication/logicallauncher.h"
34 : : #include "replication/origin.h"
35 : : #include "replication/slot.h"
36 : : #include "replication/walreceiver.h"
37 : : #include "replication/worker_internal.h"
38 : : #include "storage/ipc.h"
39 : : #include "storage/proc.h"
40 : : #include "storage/procarray.h"
41 : : #include "storage/subsystems.h"
42 : : #include "tcop/tcopprot.h"
43 : : #include "utils/builtins.h"
44 : : #include "utils/memutils.h"
45 : : #include "utils/pg_lsn.h"
46 : : #include "utils/snapmgr.h"
47 : : #include "utils/syscache.h"
48 : : #include "utils/wait_event.h"
49 : :
50 : : /* max sleep time between cycles (3min) */
51 : : #define DEFAULT_NAPTIME_PER_CYCLE 180000L
52 : :
53 : : /* GUC variables */
54 : : int max_logical_replication_workers = 4;
55 : : int max_sync_workers_per_subscription = 2;
56 : : int max_parallel_apply_workers_per_subscription = 2;
57 : :
58 : : LogicalRepWorker *MyLogicalRepWorker = NULL;
59 : :
60 : : typedef struct LogicalRepCtxStruct
61 : : {
62 : : /* Supervisor process. */
63 : : pid_t launcher_pid;
64 : :
65 : : /* Hash table holding last start times of subscriptions' apply workers. */
66 : : dsa_handle last_start_dsa;
67 : : dshash_table_handle last_start_dsh;
68 : :
69 : : /* Background workers. */
70 : : LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER];
71 : : } LogicalRepCtxStruct;
72 : :
73 : : static LogicalRepCtxStruct *LogicalRepCtx;
74 : :
75 : : static void ApplyLauncherShmemRequest(void *arg);
76 : : static void ApplyLauncherShmemInit(void *arg);
77 : :
78 : : const ShmemCallbacks ApplyLauncherShmemCallbacks = {
79 : : .request_fn = ApplyLauncherShmemRequest,
80 : : .init_fn = ApplyLauncherShmemInit,
81 : : };
82 : :
83 : : /* an entry in the last-start-times shared hash table */
84 : : typedef struct LauncherLastStartTimesEntry
85 : : {
86 : : Oid subid; /* OID of logrep subscription (hash key) */
87 : : TimestampTz last_start_time; /* last time its apply worker was started */
88 : : } LauncherLastStartTimesEntry;
89 : :
90 : : /* parameters for the last-start-times shared hash table */
91 : : static const dshash_parameters dsh_params = {
92 : : sizeof(Oid),
93 : : sizeof(LauncherLastStartTimesEntry),
94 : : dshash_memcmp,
95 : : dshash_memhash,
96 : : dshash_memcpy,
97 : : LWTRANCHE_LAUNCHER_HASH
98 : : };
99 : :
100 : : static dsa_area *last_start_times_dsa = NULL;
101 : : static dshash_table *last_start_times = NULL;
102 : :
103 : : static bool on_commit_launcher_wakeup = false;
104 : :
105 : :
106 : : static void logicalrep_launcher_onexit(int code, Datum arg);
107 : : static void logicalrep_worker_onexit(int code, Datum arg);
108 : : static void logicalrep_worker_detach(void);
109 : : static void logicalrep_worker_cleanup(LogicalRepWorker *worker);
110 : : static int logicalrep_pa_worker_count(Oid subid);
111 : : static void logicalrep_launcher_attach_dshmem(void);
112 : : static void ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time);
113 : : static TimestampTz ApplyLauncherGetWorkerStartTime(Oid subid);
114 : : static void compute_min_nonremovable_xid(LogicalRepWorker *worker, TransactionId *xmin);
115 : : static bool acquire_conflict_slot_if_exists(void);
116 : : static void update_conflict_slot_xmin(TransactionId new_xmin);
117 : : static void init_conflict_slot_xmin(void);
118 : :
119 : :
120 : : /*
121 : : * Load the list of subscriptions.
122 : : *
123 : : * Only the fields interesting for worker start/stop functions are filled for
124 : : * each subscription.
125 : : */
126 : : static List *
3393 peter_e@gmx.net 127 :CBC 3212 : get_subscription_list(void)
128 : : {
129 : 3212 : List *res = NIL;
130 : : Relation rel;
131 : : TableScanDesc scan;
132 : : HeapTuple tup;
133 : : MemoryContext resultcxt;
134 : :
135 : : /* This is the context that we will allocate our output data in */
136 : 3212 : resultcxt = CurrentMemoryContext;
137 : :
138 : : /*
139 : : * Start a transaction so we can access pg_subscription.
140 : : */
141 : 3212 : StartTransactionCommand();
142 : :
2661 andres@anarazel.de 143 : 3212 : rel = table_open(SubscriptionRelationId, AccessShareLock);
2612 144 : 3212 : scan = table_beginscan_catalog(rel, 0, NULL);
145 : :
3393 peter_e@gmx.net 146 [ + + ]: 4182 : while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
147 : : {
148 : 970 : Form_pg_subscription subform = (Form_pg_subscription) GETSTRUCT(tup);
149 : : Subscription *sub;
150 : : MemoryContext oldcxt;
151 : :
152 : : /*
153 : : * Allocate our results in the caller's context, not the
154 : : * transaction's. We do this inside the loop, and restore the original
155 : : * context at the end, so that leaky things like heap_getnext() are
156 : : * not called in a potentially long-lived context.
157 : : */
158 : 970 : oldcxt = MemoryContextSwitchTo(resultcxt);
159 : :
146 michael@paquier.xyz 160 :GNC 970 : sub = palloc0_object(Subscription);
2723 andres@anarazel.de 161 :CBC 970 : sub->oid = subform->oid;
3393 peter_e@gmx.net 162 : 970 : sub->dbid = subform->subdbid;
163 : 970 : sub->owner = subform->subowner;
164 : 970 : sub->enabled = subform->subenabled;
165 : 970 : sub->name = pstrdup(NameStr(subform->subname));
286 akapila@postgresql.o 166 :GNC 970 : sub->retaindeadtuples = subform->subretaindeadtuples;
245 167 : 970 : sub->retentionactive = subform->subretentionactive;
168 : : /* We don't fill fields we are not interested in. */
169 : :
3393 peter_e@gmx.net 170 :CBC 970 : res = lappend(res, sub);
171 : 970 : MemoryContextSwitchTo(oldcxt);
172 : : }
173 : :
2612 andres@anarazel.de 174 : 3212 : table_endscan(scan);
2661 175 : 3212 : table_close(rel, AccessShareLock);
176 : :
3393 peter_e@gmx.net 177 : 3212 : CommitTransactionCommand();
178 : :
179 : 3212 : return res;
180 : : }
181 : :
182 : : /*
183 : : * Wait for a background worker to start up and attach to the shmem context.
184 : : *
185 : : * This is only needed for cleaning up the shared memory in case the worker
186 : : * fails to attach.
187 : : *
188 : : * Returns whether the attach was successful.
189 : : */
190 : : static bool
191 : 457 : WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
192 : : uint16 generation,
193 : : BackgroundWorkerHandle *handle)
194 : : {
315 tgl@sss.pgh.pa.us 195 : 457 : bool result = false;
196 : 457 : bool dropped_latch = false;
197 : :
198 : : for (;;)
3393 peter_e@gmx.net 199 : 1290 : {
200 : : BgwHandleStatus status;
201 : : pid_t pid;
202 : : int rc;
203 : :
204 [ - + ]: 1747 : CHECK_FOR_INTERRUPTS();
205 : :
3296 206 : 1747 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
207 : :
208 : : /* Worker either died or has started. Return false if died. */
209 [ + + + + ]: 1747 : if (!worker->in_use || worker->proc)
210 : : {
315 tgl@sss.pgh.pa.us 211 : 453 : result = worker->in_use;
3296 peter_e@gmx.net 212 : 453 : LWLockRelease(LogicalRepWorkerLock);
315 tgl@sss.pgh.pa.us 213 : 453 : break;
214 : : }
215 : :
3296 peter_e@gmx.net 216 : 1294 : LWLockRelease(LogicalRepWorkerLock);
217 : :
218 : : /* Check if worker has died before attaching, and clean up after it. */
3393 219 : 1294 : status = GetBackgroundWorkerPid(handle, &pid);
220 : :
221 [ - + ]: 1294 : if (status == BGWH_STOPPED)
222 : : {
3296 peter_e@gmx.net 223 :UBC 0 : LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
224 : : /* Ensure that this was indeed the worker we waited for. */
225 [ # # ]: 0 : if (generation == worker->generation)
226 : 0 : logicalrep_worker_cleanup(worker);
227 : 0 : LWLockRelease(LogicalRepWorkerLock);
315 tgl@sss.pgh.pa.us 228 : 0 : break; /* result is already false */
229 : : }
230 : :
231 : : /*
232 : : * We need timeout because we generally don't get notified via latch
233 : : * about the worker attach. But we don't expect to have to wait long.
234 : : */
3393 peter_e@gmx.net 235 :CBC 1294 : rc = WaitLatch(MyLatch,
236 : : WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
237 : : 10L, WAIT_EVENT_BGWORKER_STARTUP);
238 : :
3255 andres@anarazel.de 239 [ + + ]: 1294 : if (rc & WL_LATCH_SET)
240 : : {
241 : 537 : ResetLatch(MyLatch);
242 [ + + ]: 537 : CHECK_FOR_INTERRUPTS();
315 tgl@sss.pgh.pa.us 243 : 533 : dropped_latch = true;
244 : : }
245 : : }
246 : :
247 : : /*
248 : : * If we had to clear a latch event in order to wait, be sure to restore
249 : : * it before exiting. Otherwise caller may miss events.
250 : : */
251 [ + - ]: 453 : if (dropped_latch)
252 : 453 : SetLatch(MyLatch);
253 : :
254 : 453 : return result;
255 : : }
256 : :
257 : : /*
258 : : * Walks the workers array and searches for one that matches given worker type,
259 : : * subscription id, and relation id.
260 : : *
261 : : * For both apply workers and sequencesync workers, the relid should be set to
262 : : * InvalidOid, as these workers handle changes across all tables and sequences
263 : : * respectively, rather than targeting a specific relation. For tablesync
264 : : * workers, the relid should be set to the OID of the relation being
265 : : * synchronized.
266 : : */
267 : : LogicalRepWorker *
189 akapila@postgresql.o 268 :GNC 3425 : logicalrep_worker_find(LogicalRepWorkerType wtype, Oid subid, Oid relid,
269 : : bool only_running)
270 : : {
271 : : int i;
3275 bruce@momjian.us 272 :CBC 3425 : LogicalRepWorker *res = NULL;
273 : :
274 : : /* relid must be valid only for table sync workers */
189 akapila@postgresql.o 275 [ - + ]:GNC 3425 : Assert((wtype == WORKERTYPE_TABLESYNC) == OidIsValid(relid));
3393 peter_e@gmx.net 276 [ - + ]:CBC 3425 : Assert(LWLockHeldByMe(LogicalRepWorkerLock));
277 : :
278 : : /* Search for an attached worker that matches the specified criteria. */
279 [ + + ]: 10436 : for (i = 0; i < max_logical_replication_workers; i++)
280 : : {
3275 bruce@momjian.us 281 : 9135 : LogicalRepWorker *w = &LogicalRepCtx->workers[i];
282 : :
283 : : /* Skip parallel apply workers. */
1212 akapila@postgresql.o 284 [ + + - + ]: 9135 : if (isParallelApplyWorker(w))
1212 akapila@postgresql.o 285 :UBC 0 : continue;
286 : :
3296 peter_e@gmx.net 287 [ + + + + :CBC 9135 : if (w->in_use && w->subid == subid && w->relid == relid &&
+ + ]
189 akapila@postgresql.o 288 [ + + + + :GNC 2161 : w->type == wtype && (!only_running || w->proc))
+ - ]
289 : : {
3393 peter_e@gmx.net 290 :CBC 2124 : res = w;
291 : 2124 : break;
292 : : }
293 : : }
294 : :
295 : 3425 : return res;
296 : : }
297 : :
298 : : /*
299 : : * Similar to logicalrep_worker_find(), but returns a list of all workers for
300 : : * the subscription, instead of just one.
301 : : */
302 : : List *
650 akapila@postgresql.o 303 : 777 : logicalrep_workers_find(Oid subid, bool only_running, bool acquire_lock)
304 : : {
305 : : int i;
3196 peter_e@gmx.net 306 : 777 : List *res = NIL;
307 : :
650 akapila@postgresql.o 308 [ + + ]: 777 : if (acquire_lock)
309 : 147 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
310 : :
3196 peter_e@gmx.net 311 [ - + ]: 777 : Assert(LWLockHeldByMe(LogicalRepWorkerLock));
312 : :
313 : : /* Search for attached worker for a given subscription id. */
314 [ + + ]: 4011 : for (i = 0; i < max_logical_replication_workers; i++)
315 : : {
316 : 3234 : LogicalRepWorker *w = &LogicalRepCtx->workers[i];
317 : :
318 [ + + + + : 3234 : if (w->in_use && w->subid == subid && (!only_running || w->proc))
+ + + + ]
319 : 520 : res = lappend(res, w);
320 : : }
321 : :
650 akapila@postgresql.o 322 [ + + ]: 777 : if (acquire_lock)
323 : 147 : LWLockRelease(LogicalRepWorkerLock);
324 : :
3196 peter_e@gmx.net 325 : 777 : return res;
326 : : }
327 : :
328 : : /*
329 : : * Start new logical replication background worker, if possible.
330 : : *
331 : : * Returns true on success, false on failure.
332 : : */
333 : : bool
995 akapila@postgresql.o 334 : 457 : logicalrep_worker_launch(LogicalRepWorkerType wtype,
335 : : Oid dbid, Oid subid, const char *subname, Oid userid,
336 : : Oid relid, dsm_handle subworker_dsm,
337 : : bool retain_dead_tuples)
338 : : {
339 : : BackgroundWorker bgw;
340 : : BackgroundWorkerHandle *bgw_handle;
341 : : uint16 generation;
342 : : int i;
3275 bruce@momjian.us 343 : 457 : int slot = 0;
344 : 457 : LogicalRepWorker *worker = NULL;
345 : : int nsyncworkers;
346 : : int nparallelapplyworkers;
347 : : TimestampTz now;
995 akapila@postgresql.o 348 : 457 : bool is_tablesync_worker = (wtype == WORKERTYPE_TABLESYNC);
181 akapila@postgresql.o 349 :GNC 457 : bool is_sequencesync_worker = (wtype == WORKERTYPE_SEQUENCESYNC);
995 akapila@postgresql.o 350 :CBC 457 : bool is_parallel_apply_worker = (wtype == WORKERTYPE_PARALLEL_APPLY);
351 : :
352 : : /*----------
353 : : * Sanity checks:
354 : : * - must be valid worker type
355 : : * - tablesync workers are only ones to have relid
356 : : * - parallel apply worker is the only kind of subworker
357 : : * - The replication slot used in conflict detection is created when
358 : : * retain_dead_tuples is enabled
359 : : */
360 [ - + ]: 457 : Assert(wtype != WORKERTYPE_UNKNOWN);
361 [ - + ]: 457 : Assert(is_tablesync_worker == OidIsValid(relid));
362 [ - + ]: 457 : Assert(is_parallel_apply_worker == (subworker_dsm != DSM_HANDLE_INVALID));
286 akapila@postgresql.o 363 [ + + - + ]:GNC 457 : Assert(!retain_dead_tuples || MyReplicationSlot);
364 : :
3268 peter_e@gmx.net 365 [ + + ]:CBC 457 : ereport(DEBUG1,
366 : : (errmsg_internal("starting logical replication worker for subscription \"%s\"",
367 : : subname)));
368 : :
369 : : /* Report this after the initial starting message for consistency. */
410 msawada@postgresql.o 370 [ - + ]: 457 : if (max_active_replication_origins == 0)
3393 peter_e@gmx.net 371 [ # # ]:UBC 0 : ereport(ERROR,
372 : : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
373 : : errmsg("cannot start logical replication workers when \"max_active_replication_origins\" is 0")));
374 : :
375 : : /*
376 : : * We need to do the modification of the shared memory under lock so that
377 : : * we have consistent view.
378 : : */
3393 peter_e@gmx.net 379 :CBC 457 : LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
380 : :
3296 381 : 457 : retry:
382 : : /* Find unused worker slot. */
383 [ + - ]: 793 : for (i = 0; i < max_logical_replication_workers; i++)
384 : : {
385 : 793 : LogicalRepWorker *w = &LogicalRepCtx->workers[i];
386 : :
387 [ + + ]: 793 : if (!w->in_use)
388 : : {
389 : 457 : worker = w;
390 : 457 : slot = i;
3393 391 : 457 : break;
392 : : }
393 : : }
394 : :
3296 395 : 457 : nsyncworkers = logicalrep_sync_worker_count(subid);
396 : :
397 : 457 : now = GetCurrentTimestamp();
398 : :
399 : : /*
400 : : * If we didn't find a free slot, try to do garbage collection. The
401 : : * reason we do this is because if some worker failed to start up and its
402 : : * parent has crashed while waiting, the in_use state was never cleared.
403 : : */
404 [ + - - + ]: 457 : if (worker == NULL || nsyncworkers >= max_sync_workers_per_subscription)
405 : : {
3275 bruce@momjian.us 406 :UBC 0 : bool did_cleanup = false;
407 : :
3296 peter_e@gmx.net 408 [ # # ]: 0 : for (i = 0; i < max_logical_replication_workers; i++)
409 : : {
410 : 0 : LogicalRepWorker *w = &LogicalRepCtx->workers[i];
411 : :
412 : : /*
413 : : * If the worker was marked in use but didn't manage to attach in
414 : : * time, clean it up.
415 : : */
416 [ # # # # : 0 : if (w->in_use && !w->proc &&
# # ]
417 : 0 : TimestampDifferenceExceeds(w->launch_time, now,
418 : : wal_receiver_timeout))
419 : : {
420 [ # # ]: 0 : elog(WARNING,
421 : : "logical replication worker for subscription %u took too long to start; canceled",
422 : : w->subid);
423 : :
424 : 0 : logicalrep_worker_cleanup(w);
425 : 0 : did_cleanup = true;
426 : : }
427 : : }
428 : :
429 [ # # ]: 0 : if (did_cleanup)
430 : 0 : goto retry;
431 : : }
432 : :
433 : : /*
434 : : * We don't allow to invoke more sync workers once we have reached the
435 : : * sync worker limit per subscription. So, just return silently as we
436 : : * might get here because of an otherwise harmless race condition.
437 : : */
181 akapila@postgresql.o 438 [ + + + + ]:GNC 457 : if ((is_tablesync_worker || is_sequencesync_worker) &&
439 [ - + ]: 222 : nsyncworkers >= max_sync_workers_per_subscription)
440 : : {
3296 peter_e@gmx.net 441 :UBC 0 : LWLockRelease(LogicalRepWorkerLock);
1212 akapila@postgresql.o 442 : 0 : return false;
443 : : }
444 : :
1212 akapila@postgresql.o 445 :CBC 457 : nparallelapplyworkers = logicalrep_pa_worker_count(subid);
446 : :
447 : : /*
448 : : * Return false if the number of parallel apply workers reached the limit
449 : : * per subscription.
450 : : */
451 [ + + ]: 457 : if (is_parallel_apply_worker &&
452 [ - + ]: 12 : nparallelapplyworkers >= max_parallel_apply_workers_per_subscription)
453 : : {
1212 akapila@postgresql.o 454 :UBC 0 : LWLockRelease(LogicalRepWorkerLock);
455 : 0 : return false;
456 : : }
457 : :
458 : : /*
459 : : * However if there are no more free worker slots, inform user about it
460 : : * before exiting.
461 : : */
3393 peter_e@gmx.net 462 [ - + ]:CBC 457 : if (worker == NULL)
463 : : {
3388 fujii@postgresql.org 464 :UBC 0 : LWLockRelease(LogicalRepWorkerLock);
3393 peter_e@gmx.net 465 [ # # ]: 0 : ereport(WARNING,
466 : : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
467 : : errmsg("out of logical replication worker slots"),
468 : : errhint("You might need to increase \"%s\".", "max_logical_replication_workers")));
1212 akapila@postgresql.o 469 : 0 : return false;
470 : : }
471 : :
472 : : /* Prepare the worker slot. */
995 akapila@postgresql.o 473 :CBC 457 : worker->type = wtype;
3296 peter_e@gmx.net 474 : 457 : worker->launch_time = now;
475 : 457 : worker->in_use = true;
476 : 457 : worker->generation++;
3330 477 : 457 : worker->proc = NULL;
3393 478 : 457 : worker->dbid = dbid;
479 : 457 : worker->userid = userid;
480 : 457 : worker->subid = subid;
3330 481 : 457 : worker->relid = relid;
482 : 457 : worker->relstate = SUBREL_STATE_UNKNOWN;
483 : 457 : worker->relstate_lsn = InvalidXLogRecPtr;
1706 akapila@postgresql.o 484 : 457 : worker->stream_fileset = NULL;
1203 485 [ + + ]: 457 : worker->leader_pid = is_parallel_apply_worker ? MyProcPid : InvalidPid;
1212 486 : 457 : worker->parallel_apply = is_parallel_apply_worker;
286 akapila@postgresql.o 487 :GNC 457 : worker->oldest_nonremovable_xid = retain_dead_tuples
488 : 2 : ? MyReplicationSlot->data.xmin
489 [ + + ]: 457 : : InvalidTransactionId;
3330 peter_e@gmx.net 490 :CBC 457 : worker->last_lsn = InvalidXLogRecPtr;
491 : 457 : TIMESTAMP_NOBEGIN(worker->last_send_time);
492 : 457 : TIMESTAMP_NOBEGIN(worker->last_recv_time);
493 : 457 : worker->reply_lsn = InvalidXLogRecPtr;
494 : 457 : TIMESTAMP_NOBEGIN(worker->reply_time);
181 akapila@postgresql.o 495 :GNC 457 : worker->last_seqsync_start_time = 0;
496 : :
497 : : /* Before releasing lock, remember generation for future identification. */
3151 tgl@sss.pgh.pa.us 498 :CBC 457 : generation = worker->generation;
499 : :
3393 peter_e@gmx.net 500 : 457 : LWLockRelease(LogicalRepWorkerLock);
501 : :
502 : : /* Register the new dynamic worker. */
3306 tgl@sss.pgh.pa.us 503 : 457 : memset(&bgw, 0, sizeof(bgw));
3275 bruce@momjian.us 504 : 457 : bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
505 : : BGWORKER_BACKEND_DATABASE_CONNECTION;
3393 peter_e@gmx.net 506 : 457 : bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
1037 nathan@postgresql.or 507 : 457 : snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres");
508 : :
987 akapila@postgresql.o 509 [ + + + + : 457 : switch (worker->type)
- - ]
510 : : {
511 : 223 : case WORKERTYPE_APPLY:
512 : 223 : snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain");
513 : 223 : snprintf(bgw.bgw_name, BGW_MAXLEN,
514 : : "logical replication apply worker for subscription %u",
515 : : subid);
516 : 223 : snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication apply worker");
517 : 223 : break;
518 : :
519 : 12 : case WORKERTYPE_PARALLEL_APPLY:
520 : 12 : snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ParallelApplyWorkerMain");
521 : 12 : snprintf(bgw.bgw_name, BGW_MAXLEN,
522 : : "logical replication parallel apply worker for subscription %u",
523 : : subid);
524 : 12 : snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication parallel worker");
525 : :
526 : 12 : memcpy(bgw.bgw_extra, &subworker_dsm, sizeof(dsm_handle));
527 : 12 : break;
528 : :
181 akapila@postgresql.o 529 :GNC 10 : case WORKERTYPE_SEQUENCESYNC:
530 : 10 : snprintf(bgw.bgw_function_name, BGW_MAXLEN, "SequenceSyncWorkerMain");
531 : 10 : snprintf(bgw.bgw_name, BGW_MAXLEN,
532 : : "logical replication sequencesync worker for subscription %u",
533 : : subid);
534 : 10 : snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication sequencesync worker");
535 : 10 : break;
536 : :
987 akapila@postgresql.o 537 :CBC 212 : case WORKERTYPE_TABLESYNC:
181 akapila@postgresql.o 538 :GNC 212 : snprintf(bgw.bgw_function_name, BGW_MAXLEN, "TableSyncWorkerMain");
987 akapila@postgresql.o 539 :CBC 212 : snprintf(bgw.bgw_name, BGW_MAXLEN,
540 : : "logical replication tablesync worker for subscription %u sync %u",
541 : : subid,
542 : : relid);
543 : 212 : snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication tablesync worker");
544 : 212 : break;
545 : :
987 akapila@postgresql.o 546 :UBC 0 : case WORKERTYPE_UNKNOWN:
547 : : /* Should never happen. */
548 [ # # ]: 0 : elog(ERROR, "unknown worker type");
549 : : }
550 : :
3393 peter_e@gmx.net 551 :CBC 457 : bgw.bgw_restart_time = BGW_NEVER_RESTART;
552 : 457 : bgw.bgw_notify_pid = MyProcPid;
3303 fujii@postgresql.org 553 : 457 : bgw.bgw_main_arg = Int32GetDatum(slot);
554 : :
3393 peter_e@gmx.net 555 [ - + ]: 457 : if (!RegisterDynamicBackgroundWorker(&bgw, &bgw_handle))
556 : : {
557 : : /* Failed to start worker, so clean up the worker slot. */
3151 tgl@sss.pgh.pa.us 558 :UBC 0 : LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
559 [ # # ]: 0 : Assert(generation == worker->generation);
560 : 0 : logicalrep_worker_cleanup(worker);
561 : 0 : LWLockRelease(LogicalRepWorkerLock);
562 : :
3393 peter_e@gmx.net 563 [ # # ]: 0 : ereport(WARNING,
564 : : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
565 : : errmsg("out of background worker slots"),
566 : : errhint("You might need to increase \"%s\".", "max_worker_processes")));
1212 akapila@postgresql.o 567 : 0 : return false;
568 : : }
569 : :
570 : : /* Now wait until it attaches. */
1212 akapila@postgresql.o 571 :CBC 457 : return WaitForReplicationWorkerAttach(worker, generation, bgw_handle);
572 : : }
573 : :
574 : : /*
575 : : * Internal function to stop the worker and wait until it detaches from the
576 : : * slot.
577 : : */
578 : : static void
579 : 90 : logicalrep_worker_stop_internal(LogicalRepWorker *worker, int signo)
580 : : {
581 : : uint16 generation;
582 : :
583 [ - + ]: 90 : Assert(LWLockHeldByMeInMode(LogicalRepWorkerLock, LW_SHARED));
584 : :
585 : : /*
586 : : * Remember which generation was our worker so we can check if what we see
587 : : * is still the same one.
588 : : */
3296 peter_e@gmx.net 589 : 90 : generation = worker->generation;
590 : :
591 : : /*
592 : : * If we found a worker but it does not have proc set then it is still
593 : : * starting up; wait for it to finish starting and then kill it.
594 : : */
595 [ + - + + ]: 90 : while (worker->in_use && !worker->proc)
596 : : {
597 : : int rc;
598 : :
3393 599 : 3 : LWLockRelease(LogicalRepWorkerLock);
600 : :
601 : : /* Wait a bit --- we don't expect to have to wait long. */
3255 andres@anarazel.de 602 : 3 : rc = WaitLatch(MyLatch,
603 : : WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
604 : : 10L, WAIT_EVENT_BGWORKER_STARTUP);
605 : :
606 [ - + ]: 3 : if (rc & WL_LATCH_SET)
607 : : {
3255 andres@anarazel.de 608 :UBC 0 : ResetLatch(MyLatch);
609 [ # # ]: 0 : CHECK_FOR_INTERRUPTS();
610 : : }
611 : :
612 : : /* Recheck worker status. */
3393 peter_e@gmx.net 613 :CBC 3 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
614 : :
615 : : /*
616 : : * Check whether the worker slot is no longer used, which would mean
617 : : * that the worker has exited, or whether the worker generation is
618 : : * different, meaning that a different worker has taken the slot.
619 : : */
3296 620 [ + - - + ]: 3 : if (!worker->in_use || worker->generation != generation)
3389 peter_e@gmx.net 621 :UBC 0 : return;
622 : :
623 : : /* Worker has assigned proc, so it has started. */
3389 peter_e@gmx.net 624 [ + - ]:CBC 3 : if (worker->proc)
3393 625 : 3 : break;
626 : : }
627 : :
628 : : /* Now terminate the worker ... */
1212 akapila@postgresql.o 629 : 90 : kill(worker->proc->pid, signo);
630 : :
631 : : /* ... and wait for it to die. */
632 : : for (;;)
3393 peter_e@gmx.net 633 : 128 : {
634 : : int rc;
635 : :
636 : : /* is it gone? */
3296 637 [ + + + + ]: 218 : if (!worker->proc || worker->generation != generation)
638 : : break;
639 : :
3230 tgl@sss.pgh.pa.us 640 : 128 : LWLockRelease(LogicalRepWorkerLock);
641 : :
642 : : /* Wait a bit --- we don't expect to have to wait long. */
3255 andres@anarazel.de 643 : 128 : rc = WaitLatch(MyLatch,
644 : : WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
645 : : 10L, WAIT_EVENT_BGWORKER_SHUTDOWN);
646 : :
647 [ + + ]: 128 : if (rc & WL_LATCH_SET)
648 : : {
649 : 42 : ResetLatch(MyLatch);
650 [ + + ]: 42 : CHECK_FOR_INTERRUPTS();
651 : : }
652 : :
3230 tgl@sss.pgh.pa.us 653 : 128 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
654 : : }
655 : : }
656 : :
657 : : /*
658 : : * Stop the logical replication worker that matches the specified worker type,
659 : : * subscription id, and relation id.
660 : : */
661 : : void
189 akapila@postgresql.o 662 :GNC 103 : logicalrep_worker_stop(LogicalRepWorkerType wtype, Oid subid, Oid relid)
663 : : {
664 : : LogicalRepWorker *worker;
665 : :
666 : : /* relid must be valid only for table sync workers */
667 [ - + ]: 103 : Assert((wtype == WORKERTYPE_TABLESYNC) == OidIsValid(relid));
668 : :
1212 akapila@postgresql.o 669 :CBC 103 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
670 : :
189 akapila@postgresql.o 671 :GNC 103 : worker = logicalrep_worker_find(wtype, subid, relid, false);
672 : :
1212 akapila@postgresql.o 673 [ + + ]:CBC 103 : if (worker)
674 : : {
675 [ + - - + ]: 81 : Assert(!isParallelApplyWorker(worker));
676 : 81 : logicalrep_worker_stop_internal(worker, SIGTERM);
677 : : }
678 : :
679 : 103 : LWLockRelease(LogicalRepWorkerLock);
680 : 103 : }
681 : :
682 : : /*
683 : : * Stop the given logical replication parallel apply worker.
684 : : *
685 : : * Node that the function sends SIGUSR2 instead of SIGTERM to the parallel apply
686 : : * worker so that the worker exits cleanly.
687 : : */
688 : : void
1092 689 : 5 : logicalrep_pa_worker_stop(ParallelApplyWorkerInfo *winfo)
690 : : {
691 : : int slot_no;
692 : : uint16 generation;
693 : : LogicalRepWorker *worker;
694 : :
695 [ - + ]: 5 : SpinLockAcquire(&winfo->shared->mutex);
696 : 5 : generation = winfo->shared->logicalrep_worker_generation;
697 : 5 : slot_no = winfo->shared->logicalrep_worker_slot_no;
698 : 5 : SpinLockRelease(&winfo->shared->mutex);
699 : :
1212 700 [ + - - + ]: 5 : Assert(slot_no >= 0 && slot_no < max_logical_replication_workers);
701 : :
702 : : /*
703 : : * Detach from the error_mq_handle for the parallel apply worker before
704 : : * stopping it. This prevents the leader apply worker from trying to
705 : : * receive the message from the error queue that might already be detached
706 : : * by the parallel apply worker.
707 : : */
1092 708 [ + - ]: 5 : if (winfo->error_mq_handle)
709 : : {
710 : 5 : shm_mq_detach(winfo->error_mq_handle);
711 : 5 : winfo->error_mq_handle = NULL;
712 : : }
713 : :
1212 714 : 5 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
715 : :
716 : 5 : worker = &LogicalRepCtx->workers[slot_no];
717 [ + - - + ]: 5 : Assert(isParallelApplyWorker(worker));
718 : :
719 : : /*
720 : : * Only stop the worker if the generation matches and the worker is alive.
721 : : */
722 [ + - + - ]: 5 : if (worker->generation == generation && worker->proc)
223 723 : 5 : logicalrep_worker_stop_internal(worker, SIGUSR2);
724 : :
3230 tgl@sss.pgh.pa.us 725 : 5 : LWLockRelease(LogicalRepWorkerLock);
3393 peter_e@gmx.net 726 : 5 : }
727 : :
728 : : /*
729 : : * Wake up (using latch) any logical replication worker that matches the
730 : : * specified worker type, subscription id, and relation id.
731 : : */
732 : : void
189 akapila@postgresql.o 733 :GNC 227 : logicalrep_worker_wakeup(LogicalRepWorkerType wtype, Oid subid, Oid relid)
734 : : {
735 : : LogicalRepWorker *worker;
736 : :
737 : : /* relid must be valid only for table sync workers */
738 [ - + ]: 227 : Assert((wtype == WORKERTYPE_TABLESYNC) == OidIsValid(relid));
739 : :
3330 peter_e@gmx.net 740 :CBC 227 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
741 : :
189 akapila@postgresql.o 742 :GNC 227 : worker = logicalrep_worker_find(wtype, subid, relid, true);
743 : :
3330 peter_e@gmx.net 744 [ + - ]:CBC 227 : if (worker)
745 : 227 : logicalrep_worker_wakeup_ptr(worker);
746 : :
3231 tgl@sss.pgh.pa.us 747 : 227 : LWLockRelease(LogicalRepWorkerLock);
3330 peter_e@gmx.net 748 : 227 : }
749 : :
750 : : /*
751 : : * Wake up (using latch) the specified logical replication worker.
752 : : *
753 : : * Caller must hold lock, else worker->proc could change under us.
754 : : */
755 : : void
756 : 698 : logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker)
757 : : {
3231 tgl@sss.pgh.pa.us 758 [ - + ]: 698 : Assert(LWLockHeldByMe(LogicalRepWorkerLock));
759 : :
3330 peter_e@gmx.net 760 : 698 : SetLatch(&worker->proc->procLatch);
761 : 698 : }
762 : :
763 : : /*
764 : : * Attach to a slot.
765 : : */
766 : : void
3393 767 : 592 : logicalrep_worker_attach(int slot)
768 : : {
769 : : /* Block concurrent access. */
770 : 592 : LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
771 : :
772 [ + - - + ]: 592 : Assert(slot >= 0 && slot < max_logical_replication_workers);
773 : 592 : MyLogicalRepWorker = &LogicalRepCtx->workers[slot];
774 : :
3296 775 [ - + ]: 592 : if (!MyLogicalRepWorker->in_use)
776 : : {
3296 peter_e@gmx.net 777 :UBC 0 : LWLockRelease(LogicalRepWorkerLock);
778 [ # # ]: 0 : ereport(ERROR,
779 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
780 : : errmsg("logical replication worker slot %d is empty, cannot attach",
781 : : slot)));
782 : : }
783 : :
3393 peter_e@gmx.net 784 [ - + ]:CBC 592 : if (MyLogicalRepWorker->proc)
785 : : {
3296 peter_e@gmx.net 786 :UBC 0 : LWLockRelease(LogicalRepWorkerLock);
3393 787 [ # # ]: 0 : ereport(ERROR,
788 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
789 : : errmsg("logical replication worker slot %d is already used by "
790 : : "another worker, cannot attach", slot)));
791 : : }
792 : :
3393 peter_e@gmx.net 793 :CBC 592 : MyLogicalRepWorker->proc = MyProc;
794 : 592 : before_shmem_exit(logicalrep_worker_onexit, (Datum) 0);
795 : :
796 : 592 : LWLockRelease(LogicalRepWorkerLock);
797 : 592 : }
798 : :
799 : : /*
800 : : * Stop the parallel apply workers if any, and detach the leader apply worker
801 : : * (cleans up the worker info).
802 : : */
803 : : static void
804 : 592 : logicalrep_worker_detach(void)
805 : : {
806 : : /* Stop the parallel apply workers. */
1212 akapila@postgresql.o 807 [ + + ]: 592 : if (am_leader_apply_worker())
808 : : {
809 : : List *workers;
810 : : ListCell *lc;
811 : :
812 : : /*
813 : : * Detach from the error_mq_handle for all parallel apply workers
814 : : * before terminating them. This prevents the leader apply worker from
815 : : * receiving the worker termination message and sending it to logs
816 : : * when the same is already done by the parallel worker.
817 : : */
818 : 356 : pa_detach_all_error_mq();
819 : :
820 : 356 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
821 : :
650 822 : 356 : workers = logicalrep_workers_find(MyLogicalRepWorker->subid, true, false);
1212 823 [ + - + + : 717 : foreach(lc, workers)
+ + ]
824 : : {
825 : 361 : LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
826 : :
827 [ + - + + ]: 361 : if (isParallelApplyWorker(w))
828 : 4 : logicalrep_worker_stop_internal(w, SIGTERM);
829 : : }
830 : :
831 : 356 : LWLockRelease(LogicalRepWorkerLock);
832 : :
276 tgl@sss.pgh.pa.us 833 :GNC 356 : list_free(workers);
834 : : }
835 : :
836 : : /* Block concurrent access. */
3393 peter_e@gmx.net 837 :CBC 592 : LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
838 : :
3296 839 : 592 : logicalrep_worker_cleanup(MyLogicalRepWorker);
840 : :
3393 841 : 592 : LWLockRelease(LogicalRepWorkerLock);
842 : 592 : }
843 : :
844 : : /*
845 : : * Clean up worker info.
846 : : */
847 : : static void
3296 848 : 592 : logicalrep_worker_cleanup(LogicalRepWorker *worker)
849 : : {
850 [ - + ]: 592 : Assert(LWLockHeldByMeInMode(LogicalRepWorkerLock, LW_EXCLUSIVE));
851 : :
984 akapila@postgresql.o 852 : 592 : worker->type = WORKERTYPE_UNKNOWN;
3296 peter_e@gmx.net 853 : 592 : worker->in_use = false;
854 : 592 : worker->proc = NULL;
855 : 592 : worker->dbid = InvalidOid;
856 : 592 : worker->userid = InvalidOid;
857 : 592 : worker->subid = InvalidOid;
858 : 592 : worker->relid = InvalidOid;
1203 akapila@postgresql.o 859 : 592 : worker->leader_pid = InvalidPid;
1212 860 : 592 : worker->parallel_apply = false;
3296 peter_e@gmx.net 861 : 592 : }
862 : :
863 : : /*
864 : : * Cleanup function for logical replication launcher.
865 : : *
866 : : * Called on logical replication launcher exit.
867 : : */
868 : : static void
3303 fujii@postgresql.org 869 : 513 : logicalrep_launcher_onexit(int code, Datum arg)
870 : : {
871 : 513 : LogicalRepCtx->launcher_pid = 0;
872 : 513 : }
873 : :
874 : : /*
875 : : * Reset the last_seqsync_start_time of the sequencesync worker in the
876 : : * subscription's apply worker.
877 : : *
878 : : * Note that this value is not stored in the sequencesync worker, because that
879 : : * has finished already and is about to exit.
880 : : */
881 : : void
181 akapila@postgresql.o 882 :GNC 5 : logicalrep_reset_seqsync_start_time(void)
883 : : {
884 : : LogicalRepWorker *worker;
885 : :
886 : : /*
887 : : * The apply worker can't access last_seqsync_start_time concurrently, so
888 : : * it is okay to use SHARED lock here. See ProcessSequencesForSync().
889 : : */
890 : 5 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
891 : :
892 : 5 : worker = logicalrep_worker_find(WORKERTYPE_APPLY,
893 : 5 : MyLogicalRepWorker->subid, InvalidOid,
894 : : true);
895 [ + - ]: 5 : if (worker)
896 : 5 : worker->last_seqsync_start_time = 0;
897 : :
898 : 5 : LWLockRelease(LogicalRepWorkerLock);
899 : 5 : }
900 : :
901 : : /*
902 : : * Cleanup function.
903 : : *
904 : : * Called on logical replication worker exit.
905 : : */
906 : : static void
3393 peter_e@gmx.net 907 :CBC 592 : logicalrep_worker_onexit(int code, Datum arg)
908 : : {
909 : : /* Disconnect gracefully from the remote side. */
1819 alvherre@alvh.no-ip. 910 [ + + ]: 592 : if (LogRepWorkerWalRcvConn)
911 : 471 : walrcv_disconnect(LogRepWorkerWalRcvConn);
912 : :
3393 peter_e@gmx.net 913 : 592 : logicalrep_worker_detach();
914 : :
915 : : /* Cleanup fileset used for streaming transactions. */
1706 akapila@postgresql.o 916 [ + + ]: 592 : if (MyLogicalRepWorker->stream_fileset != NULL)
917 : 14 : FileSetDeleteAll(MyLogicalRepWorker->stream_fileset);
918 : :
919 : : /*
920 : : * Session level locks may be acquired outside of a transaction in
921 : : * parallel apply mode and will not be released when the worker
922 : : * terminates, so manually release all locks before the worker exits.
923 : : *
924 : : * The locks will be acquired once the worker is initialized.
925 : : */
1098 926 [ + + ]: 592 : if (!InitializingApplyWorker)
927 : 520 : LockReleaseAll(DEFAULT_LOCKMETHOD, true);
928 : :
3260 peter_e@gmx.net 929 : 592 : ApplyLauncherWakeup();
3393 930 : 592 : }
931 : :
932 : : /*
933 : : * Count the number of registered (not necessarily running) sync workers
934 : : * for a subscription.
935 : : */
936 : : int
3330 937 : 1384 : logicalrep_sync_worker_count(Oid subid)
938 : : {
939 : : int i;
3275 bruce@momjian.us 940 : 1384 : int res = 0;
941 : :
3330 peter_e@gmx.net 942 [ - + ]: 1384 : Assert(LWLockHeldByMe(LogicalRepWorkerLock));
943 : :
944 : : /* Search for attached worker for a given subscription id. */
945 [ + + ]: 7126 : for (i = 0; i < max_logical_replication_workers; i++)
946 : : {
3275 bruce@momjian.us 947 : 5742 : LogicalRepWorker *w = &LogicalRepCtx->workers[i];
948 : :
181 akapila@postgresql.o 949 [ + + + - :GNC 5742 : if (w->subid == subid && (isTableSyncWorker(w) || isSequenceSyncWorker(w)))
+ + + - -
+ ]
3330 peter_e@gmx.net 950 :CBC 1514 : res++;
951 : : }
952 : :
953 : 1384 : return res;
954 : : }
955 : :
956 : : /*
957 : : * Count the number of registered (but not necessarily running) parallel apply
958 : : * workers for a subscription.
959 : : */
960 : : static int
1212 akapila@postgresql.o 961 : 457 : logicalrep_pa_worker_count(Oid subid)
962 : : {
963 : : int i;
964 : 457 : int res = 0;
965 : :
966 [ - + ]: 457 : Assert(LWLockHeldByMe(LogicalRepWorkerLock));
967 : :
968 : : /*
969 : : * Scan all attached parallel apply workers, only counting those which
970 : : * have the given subscription id.
971 : : */
972 [ + + ]: 2409 : for (i = 0; i < max_logical_replication_workers; i++)
973 : : {
974 : 1952 : LogicalRepWorker *w = &LogicalRepCtx->workers[i];
975 : :
984 976 [ + + + + : 1952 : if (isParallelApplyWorker(w) && w->subid == subid)
+ - ]
1212 977 : 2 : res++;
978 : : }
979 : :
980 : 457 : return res;
981 : : }
982 : :
983 : : /*
984 : : * ApplyLauncherShmemRequest
985 : : * Register shared memory space needed for replication launcher
986 : : */
987 : : static void
29 heikki.linnakangas@i 988 :GNC 1244 : ApplyLauncherShmemRequest(void *arg)
989 : : {
990 : : Size size;
991 : :
992 : : /*
993 : : * Need the fixed struct and the array of LogicalRepWorker.
994 : : */
3393 peter_e@gmx.net 995 :CBC 1244 : size = sizeof(LogicalRepCtxStruct);
996 : 1244 : size = MAXALIGN(size);
997 : 1244 : size = add_size(size, mul_size(max_logical_replication_workers,
998 : : sizeof(LogicalRepWorker)));
29 heikki.linnakangas@i 999 :GNC 1244 : ShmemRequestStruct(.name = "Logical Replication Launcher Data",
1000 : : .size = size,
1001 : : .ptr = (void **) &LogicalRepCtx,
1002 : : );
3393 peter_e@gmx.net 1003 :GIC 1244 : }
1004 : :
1005 : : /*
1006 : : * ApplyLauncherRegister
1007 : : * Register a background worker running the logical replication launcher.
1008 : : */
1009 : : void
3393 peter_e@gmx.net 1010 :CBC 1004 : ApplyLauncherRegister(void)
1011 : : {
1012 : : BackgroundWorker bgw;
1013 : :
1014 : : /*
1015 : : * The logical replication launcher is disabled during binary upgrades, to
1016 : : * prevent logical replication workers from running on the source cluster.
1017 : : * That could cause replication origins to move forward after having been
1018 : : * copied to the target cluster, potentially creating conflicts with the
1019 : : * copied data files.
1020 : : */
844 michael@paquier.xyz 1021 [ + + + + ]: 1004 : if (max_logical_replication_workers == 0 || IsBinaryUpgrade)
3393 peter_e@gmx.net 1022 : 61 : return;
1023 : :
3306 tgl@sss.pgh.pa.us 1024 : 943 : memset(&bgw, 0, sizeof(bgw));
3275 bruce@momjian.us 1025 : 943 : bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
1026 : : BGWORKER_BACKEND_DATABASE_CONNECTION;
3393 peter_e@gmx.net 1027 : 943 : bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
1037 nathan@postgresql.or 1028 : 943 : snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres");
3322 rhaas@postgresql.org 1029 : 943 : snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyLauncherMain");
3393 peter_e@gmx.net 1030 : 943 : snprintf(bgw.bgw_name, BGW_MAXLEN,
1031 : : "logical replication launcher");
3169 1032 : 943 : snprintf(bgw.bgw_type, BGW_MAXLEN,
1033 : : "logical replication launcher");
3393 1034 : 943 : bgw.bgw_restart_time = 5;
1035 : 943 : bgw.bgw_notify_pid = 0;
1036 : 943 : bgw.bgw_main_arg = (Datum) 0;
1037 : :
1038 : 943 : RegisterBackgroundWorker(&bgw);
1039 : : }
1040 : :
1041 : : /*
1042 : : * ApplyLauncherShmemInit
1043 : : * Initialize replication launcher shared memory
1044 : : */
1045 : : static void
29 heikki.linnakangas@i 1046 :GNC 1241 : ApplyLauncherShmemInit(void *arg)
1047 : : {
1048 : : int slot;
1049 : :
1050 : 1241 : LogicalRepCtx->last_start_dsa = DSA_HANDLE_INVALID;
1051 : 1241 : LogicalRepCtx->last_start_dsh = DSHASH_HANDLE_INVALID;
1052 : :
1053 : : /* Initialize memory and spin locks for each worker slot. */
1054 [ + + ]: 6168 : for (slot = 0; slot < max_logical_replication_workers; slot++)
1055 : : {
1056 : 4927 : LogicalRepWorker *worker = &LogicalRepCtx->workers[slot];
1057 : :
1058 : 4927 : memset(worker, 0, sizeof(LogicalRepWorker));
1059 : 4927 : SpinLockInit(&worker->relmutex);
1060 : : }
3393 peter_e@gmx.net 1061 :CBC 1241 : }
1062 : :
1063 : : /*
1064 : : * Initialize or attach to the dynamic shared hash table that stores the
1065 : : * last-start times, if not already done.
1066 : : * This must be called before accessing the table.
1067 : : */
1068 : : static void
1199 tgl@sss.pgh.pa.us 1069 : 841 : logicalrep_launcher_attach_dshmem(void)
1070 : : {
1071 : : MemoryContext oldcontext;
1072 : :
1073 : : /* Quick exit if we already did this. */
1196 1074 [ + + ]: 841 : if (LogicalRepCtx->last_start_dsh != DSHASH_HANDLE_INVALID &&
1199 1075 [ + + ]: 781 : last_start_times != NULL)
1076 : 575 : return;
1077 : :
1078 : : /* Otherwise, use a lock to ensure only one process creates the table. */
1079 : 266 : LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
1080 : :
1081 : : /* Be sure any local memory allocated by DSA routines is persistent. */
1082 : 266 : oldcontext = MemoryContextSwitchTo(TopMemoryContext);
1083 : :
1196 1084 [ + + ]: 266 : if (LogicalRepCtx->last_start_dsh == DSHASH_HANDLE_INVALID)
1085 : : {
1086 : : /* Initialize dynamic shared hash table for last-start times. */
1199 1087 : 60 : last_start_times_dsa = dsa_create(LWTRANCHE_LAUNCHER_DSA);
1088 : 60 : dsa_pin(last_start_times_dsa);
1089 : 60 : dsa_pin_mapping(last_start_times_dsa);
799 nathan@postgresql.or 1090 : 60 : last_start_times = dshash_create(last_start_times_dsa, &dsh_params, NULL);
1091 : :
1092 : : /* Store handles in shared memory for other backends to use. */
1199 tgl@sss.pgh.pa.us 1093 : 60 : LogicalRepCtx->last_start_dsa = dsa_get_handle(last_start_times_dsa);
1094 : 60 : LogicalRepCtx->last_start_dsh = dshash_get_hash_table_handle(last_start_times);
1095 : : }
1096 [ + - ]: 206 : else if (!last_start_times)
1097 : : {
1098 : : /* Attach to existing dynamic shared hash table. */
1099 : 206 : last_start_times_dsa = dsa_attach(LogicalRepCtx->last_start_dsa);
1100 : 206 : dsa_pin_mapping(last_start_times_dsa);
1101 : 206 : last_start_times = dshash_attach(last_start_times_dsa, &dsh_params,
333 nathan@postgresql.or 1102 : 206 : LogicalRepCtx->last_start_dsh, NULL);
1103 : : }
1104 : :
1199 tgl@sss.pgh.pa.us 1105 : 266 : MemoryContextSwitchTo(oldcontext);
1106 : 266 : LWLockRelease(LogicalRepWorkerLock);
1107 : : }
1108 : :
1109 : : /*
1110 : : * Set the last-start time for the subscription.
1111 : : */
1112 : : static void
1113 : 223 : ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time)
1114 : : {
1115 : : LauncherLastStartTimesEntry *entry;
1116 : : bool found;
1117 : :
1118 : 223 : logicalrep_launcher_attach_dshmem();
1119 : :
1120 : 223 : entry = dshash_find_or_insert(last_start_times, &subid, &found);
1121 : 223 : entry->last_start_time = start_time;
1122 : 223 : dshash_release_lock(last_start_times, entry);
1123 : 223 : }
1124 : :
1125 : : /*
1126 : : * Return the last-start time for the subscription, or 0 if there isn't one.
1127 : : */
1128 : : static TimestampTz
1129 : 352 : ApplyLauncherGetWorkerStartTime(Oid subid)
1130 : : {
1131 : : LauncherLastStartTimesEntry *entry;
1132 : : TimestampTz ret;
1133 : :
1134 : 352 : logicalrep_launcher_attach_dshmem();
1135 : :
1136 : 352 : entry = dshash_find(last_start_times, &subid, false);
1137 [ + + ]: 352 : if (entry == NULL)
1138 : 130 : return 0;
1139 : :
1140 : 222 : ret = entry->last_start_time;
1141 : 222 : dshash_release_lock(last_start_times, entry);
1142 : :
1143 : 222 : return ret;
1144 : : }
1145 : :
1146 : : /*
1147 : : * Remove the last-start-time entry for the subscription, if one exists.
1148 : : *
1149 : : * This has two use-cases: to remove the entry related to a subscription
1150 : : * that's been deleted or disabled (just to avoid leaking shared memory),
1151 : : * and to allow immediate restart of an apply worker that has exited
1152 : : * due to subscription parameter changes.
1153 : : */
1154 : : void
1155 : 266 : ApplyLauncherForgetWorkerStartTime(Oid subid)
1156 : : {
1157 : 266 : logicalrep_launcher_attach_dshmem();
1158 : :
1159 : 266 : (void) dshash_delete_key(last_start_times, &subid);
1160 : 266 : }
1161 : :
1162 : : /*
1163 : : * Wakeup the launcher on commit if requested.
1164 : : */
1165 : : void
3291 peter_e@gmx.net 1166 : 423274 : AtEOXact_ApplyLauncher(bool isCommit)
1167 : : {
3196 1168 [ + + ]: 423274 : if (isCommit)
1169 : : {
1170 [ + + ]: 387624 : if (on_commit_launcher_wakeup)
1171 : 154 : ApplyLauncherWakeup();
1172 : : }
1173 : :
3291 1174 : 423274 : on_commit_launcher_wakeup = false;
3393 1175 : 423274 : }
1176 : :
1177 : : /*
1178 : : * Request wakeup of the launcher on commit of the transaction.
1179 : : *
1180 : : * This is used to send launcher signal to stop sleeping and process the
1181 : : * subscriptions when current transaction commits. Should be used when new
1182 : : * tuple was added to the pg_subscription catalog.
1183 : : */
1184 : : void
1185 : 155 : ApplyLauncherWakeupAtCommit(void)
1186 : : {
3375 heikki.linnakangas@i 1187 [ + + ]: 155 : if (!on_commit_launcher_wakeup)
1188 : 154 : on_commit_launcher_wakeup = true;
3393 peter_e@gmx.net 1189 : 155 : }
1190 : :
1191 : : /*
1192 : : * Wakeup the launcher immediately.
1193 : : */
1194 : : void
1195 : 783 : ApplyLauncherWakeup(void)
1196 : : {
3303 fujii@postgresql.org 1197 [ + + ]: 783 : if (LogicalRepCtx->launcher_pid != 0)
3393 peter_e@gmx.net 1198 : 754 : kill(LogicalRepCtx->launcher_pid, SIGUSR1);
1199 : 783 : }
1200 : :
1201 : : /*
1202 : : * Main loop for the apply launcher process.
1203 : : */
1204 : : void
1205 : 513 : ApplyLauncherMain(Datum main_arg)
1206 : : {
3343 tgl@sss.pgh.pa.us 1207 [ + + ]: 513 : ereport(DEBUG1,
1208 : : (errmsg_internal("logical replication launcher started")));
1209 : :
3303 fujii@postgresql.org 1210 : 513 : before_shmem_exit(logicalrep_launcher_onexit, (Datum) 0);
1211 : :
3253 andres@anarazel.de 1212 [ - + ]: 513 : Assert(LogicalRepCtx->launcher_pid == 0);
1213 : 513 : LogicalRepCtx->launcher_pid = MyProcPid;
1214 : :
1215 : : /* Establish signal handlers. */
2118 akapila@postgresql.o 1216 : 513 : pqsignal(SIGHUP, SignalHandlerForConfigReload);
3393 peter_e@gmx.net 1217 : 513 : BackgroundWorkerUnblockSignals();
1218 : :
1219 : : /*
1220 : : * Establish connection to nailed catalogs (we only ever access
1221 : : * pg_subscription).
1222 : : */
2952 magnus@hagander.net 1223 : 513 : BackgroundWorkerInitializeConnection(NULL, NULL, 0);
1224 : :
1225 : : /*
1226 : : * Acquire the conflict detection slot at startup to ensure it can be
1227 : : * dropped if no longer needed after a restart.
1228 : : */
286 akapila@postgresql.o 1229 :GNC 513 : acquire_conflict_slot_if_exists();
1230 : :
1231 : : /* Enter main loop */
1232 : : for (;;)
3393 peter_e@gmx.net 1233 :CBC 2700 : {
1234 : : int rc;
1235 : : List *sublist;
1236 : : ListCell *lc;
1237 : : MemoryContext subctx;
1238 : : MemoryContext oldctx;
3275 bruce@momjian.us 1239 : 3213 : long wait_time = DEFAULT_NAPTIME_PER_CYCLE;
245 akapila@postgresql.o 1240 :GNC 3213 : bool can_update_xmin = true;
286 1241 : 3213 : bool retain_dead_tuples = false;
1242 : 3213 : TransactionId xmin = InvalidTransactionId;
1243 : :
3253 andres@anarazel.de 1244 [ + + ]:CBC 3213 : CHECK_FOR_INTERRUPTS();
1245 : :
1246 : : /* Use temporary context to avoid leaking memory across cycles. */
1199 tgl@sss.pgh.pa.us 1247 : 3212 : subctx = AllocSetContextCreate(TopMemoryContext,
1248 : : "Logical Replication Launcher sublist",
1249 : : ALLOCSET_DEFAULT_SIZES);
1250 : 3212 : oldctx = MemoryContextSwitchTo(subctx);
1251 : :
1252 : : /*
1253 : : * Start any missing workers for enabled subscriptions.
1254 : : *
1255 : : * Also, during the iteration through all subscriptions, we compute
1256 : : * the minimum XID required to protect deleted tuples for conflict
1257 : : * detection if one of the subscription enables retain_dead_tuples
1258 : : * option.
1259 : : */
1260 : 3212 : sublist = get_subscription_list();
1261 [ + + + + : 4180 : foreach(lc, sublist)
+ + ]
1262 : : {
1263 : 970 : Subscription *sub = (Subscription *) lfirst(lc);
1264 : : LogicalRepWorker *w;
1265 : : TimestampTz last_start;
1266 : : TimestampTz now;
1267 : : long elapsed;
1268 : :
286 akapila@postgresql.o 1269 [ + + ]:GNC 970 : if (sub->retaindeadtuples)
1270 : : {
1271 : 69 : retain_dead_tuples = true;
1272 : :
1273 : : /*
1274 : : * Create a replication slot to retain information necessary
1275 : : * for conflict detection such as dead tuples, commit
1276 : : * timestamps, and origins.
1277 : : *
1278 : : * The slot is created before starting the apply worker to
1279 : : * prevent it from unnecessarily maintaining its
1280 : : * oldest_nonremovable_xid.
1281 : : *
1282 : : * The slot is created even for a disabled subscription to
1283 : : * ensure that conflict-related information is available when
1284 : : * applying remote changes that occurred before the
1285 : : * subscription was enabled.
1286 : : */
1287 : 69 : CreateConflictDetectionSlot();
1288 : :
245 1289 [ + - ]: 69 : if (sub->retentionactive)
1290 : : {
1291 : : /*
1292 : : * Can't advance xmin of the slot unless all the
1293 : : * subscriptions actively retaining dead tuples are
1294 : : * enabled. This is required to ensure that we don't
1295 : : * advance the xmin of CONFLICT_DETECTION_SLOT if one of
1296 : : * the subscriptions is not enabled. Otherwise, we won't
1297 : : * be able to detect conflicts reliably for such a
1298 : : * subscription even though it has set the
1299 : : * retain_dead_tuples option.
1300 : : */
1301 : 69 : can_update_xmin &= sub->enabled;
1302 : :
1303 : : /*
1304 : : * Initialize the slot once the subscription activates
1305 : : * retention.
1306 : : */
1307 [ - + ]: 69 : if (!TransactionIdIsValid(MyReplicationSlot->data.xmin))
245 akapila@postgresql.o 1308 :UNC 0 : init_conflict_slot_xmin();
1309 : : }
1310 : : }
1311 : :
1199 tgl@sss.pgh.pa.us 1312 [ + + ]:CBC 970 : if (!sub->enabled)
1313 : 40 : continue;
1314 : :
1315 : 930 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
189 akapila@postgresql.o 1316 :GNC 930 : w = logicalrep_worker_find(WORKERTYPE_APPLY, sub->oid, InvalidOid,
1317 : : false);
1318 : :
1199 tgl@sss.pgh.pa.us 1319 [ + + ]:CBC 930 : if (w != NULL)
1320 : : {
1321 : : /*
1322 : : * Compute the minimum xmin required to protect dead tuples
1323 : : * required for conflict detection among all running apply
1324 : : * workers. This computation is performed while holding
1325 : : * LogicalRepWorkerLock to prevent accessing invalid worker
1326 : : * data, in scenarios where a worker might exit and reset its
1327 : : * state concurrently.
1328 : : */
245 akapila@postgresql.o 1329 [ + + ]:GNC 578 : if (sub->retaindeadtuples &&
1330 [ + - + - ]: 64 : sub->retentionactive &&
1331 : : can_update_xmin)
286 1332 : 64 : compute_min_nonremovable_xid(w, &xmin);
1333 : :
232 1334 : 578 : LWLockRelease(LogicalRepWorkerLock);
1335 : :
1336 : : /* worker is running already */
286 1337 : 578 : continue;
1338 : : }
1339 : :
232 1340 : 352 : LWLockRelease(LogicalRepWorkerLock);
1341 : :
1342 : : /*
1343 : : * Can't advance xmin of the slot unless all the workers
1344 : : * corresponding to subscriptions actively retaining dead tuples
1345 : : * are running, disabling the further computation of the minimum
1346 : : * nonremovable xid.
1347 : : */
245 1348 [ + + + - ]: 352 : if (sub->retaindeadtuples && sub->retentionactive)
1349 : 2 : can_update_xmin = false;
1350 : :
1351 : : /*
1352 : : * If the worker is eligible to start now, launch it. Otherwise,
1353 : : * adjust wait_time so that we'll wake up as soon as it can be
1354 : : * started.
1355 : : *
1356 : : * Each subscription's apply worker can only be restarted once per
1357 : : * wal_retrieve_retry_interval, so that errors do not cause us to
1358 : : * repeatedly restart the worker as fast as possible. In cases
1359 : : * where a restart is expected (e.g., subscription parameter
1360 : : * changes), another process should remove the last-start entry
1361 : : * for the subscription so that the worker can be restarted
1362 : : * without waiting for wal_retrieve_retry_interval to elapse.
1363 : : */
1199 tgl@sss.pgh.pa.us 1364 :CBC 352 : last_start = ApplyLauncherGetWorkerStartTime(sub->oid);
1365 : 352 : now = GetCurrentTimestamp();
1366 [ + + ]: 352 : if (last_start == 0 ||
1367 [ + + ]: 222 : (elapsed = TimestampDifferenceMilliseconds(last_start, now)) >= wal_retrieve_retry_interval)
1368 : : {
1369 : 223 : ApplyLauncherSetWorkerStartTime(sub->oid, now);
315 1370 [ + + ]: 222 : if (!logicalrep_worker_launch(WORKERTYPE_APPLY,
1371 : 223 : sub->dbid, sub->oid, sub->name,
1372 : : sub->owner, InvalidOid,
1373 : : DSM_HANDLE_INVALID,
245 akapila@postgresql.o 1374 [ + + ]:GNC 225 : sub->retaindeadtuples &&
1375 [ + - ]: 2 : sub->retentionactive))
1376 : : {
1377 : : /*
1378 : : * We get here either if we failed to launch a worker
1379 : : * (perhaps for resource-exhaustion reasons) or if we
1380 : : * launched one but it immediately quit. Either way, it
1381 : : * seems appropriate to try again after
1382 : : * wal_retrieve_retry_interval.
1383 : : */
315 tgl@sss.pgh.pa.us 1384 :GBC 1 : wait_time = Min(wait_time,
1385 : : wal_retrieve_retry_interval);
1386 : : }
1387 : : }
1388 : : else
1389 : : {
1199 tgl@sss.pgh.pa.us 1390 :CBC 129 : wait_time = Min(wait_time,
1391 : : wal_retrieve_retry_interval - elapsed);
1392 : : }
1393 : : }
1394 : :
1395 : : /*
1396 : : * Drop the CONFLICT_DETECTION_SLOT slot if there is no subscription
1397 : : * that requires us to retain dead tuples. Otherwise, if required,
1398 : : * advance the slot's xmin to protect dead tuples required for the
1399 : : * conflict detection.
1400 : : *
1401 : : * Additionally, if all apply workers for subscriptions with
1402 : : * retain_dead_tuples enabled have requested to stop retention, the
1403 : : * slot's xmin will be set to InvalidTransactionId allowing the
1404 : : * removal of dead tuples.
1405 : : */
286 akapila@postgresql.o 1406 [ + + ]:GNC 3210 : if (MyReplicationSlot)
1407 : : {
1408 [ + + ]: 70 : if (!retain_dead_tuples)
1409 : 1 : ReplicationSlotDropAcquired();
245 1410 [ + + ]: 69 : else if (can_update_xmin)
1411 : 64 : update_conflict_slot_xmin(xmin);
1412 : : }
1413 : :
1414 : : /* Switch back to original memory context. */
1199 tgl@sss.pgh.pa.us 1415 :CBC 3210 : MemoryContextSwitchTo(oldctx);
1416 : : /* Clean the temporary memory. */
1417 : 3210 : MemoryContextDelete(subctx);
1418 : :
1419 : : /* Wait for more work. */
3255 andres@anarazel.de 1420 : 3210 : rc = WaitLatch(MyLatch,
1421 : : WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
1422 : : wait_time,
1423 : : WAIT_EVENT_LOGICAL_LAUNCHER_MAIN);
1424 : :
1425 [ + + ]: 3207 : if (rc & WL_LATCH_SET)
1426 : : {
1427 : 3181 : ResetLatch(MyLatch);
1428 [ + + ]: 3181 : CHECK_FOR_INTERRUPTS();
1429 : : }
1430 : :
2331 rhaas@postgresql.org 1431 [ + + ]: 2700 : if (ConfigReloadPending)
1432 : : {
1433 : 63 : ConfigReloadPending = false;
3312 peter_e@gmx.net 1434 : 63 : ProcessConfigFile(PGC_SIGHUP);
1435 : : }
1436 : : }
1437 : :
1438 : : /* Not reachable */
1439 : : }
1440 : :
1441 : : /*
1442 : : * Determine the minimum non-removable transaction ID across all apply workers
1443 : : * for subscriptions that have retain_dead_tuples enabled. Store the result
1444 : : * in *xmin.
1445 : : */
1446 : : static void
286 akapila@postgresql.o 1447 :GNC 64 : compute_min_nonremovable_xid(LogicalRepWorker *worker, TransactionId *xmin)
1448 : : {
1449 : : TransactionId nonremovable_xid;
1450 : :
1451 [ - + ]: 64 : Assert(worker != NULL);
1452 : :
1453 : : /*
1454 : : * The replication slot for conflict detection must be created before the
1455 : : * worker starts.
1456 : : */
1457 [ - + ]: 64 : Assert(MyReplicationSlot);
1458 : :
1459 : 64 : SpinLockAcquire(&worker->relmutex);
1460 : 64 : nonremovable_xid = worker->oldest_nonremovable_xid;
1461 : 64 : SpinLockRelease(&worker->relmutex);
1462 : :
1463 : : /*
1464 : : * Return if the apply worker has stopped retention concurrently.
1465 : : *
1466 : : * Although this function is invoked only when retentionactive is true,
1467 : : * the apply worker might stop retention after the launcher fetches the
1468 : : * retentionactive flag.
1469 : : */
245 1470 [ - + ]: 64 : if (!TransactionIdIsValid(nonremovable_xid))
245 akapila@postgresql.o 1471 :UNC 0 : return;
1472 : :
286 akapila@postgresql.o 1473 [ - + - - ]:GNC 64 : if (!TransactionIdIsValid(*xmin) ||
286 akapila@postgresql.o 1474 :UNC 0 : TransactionIdPrecedes(nonremovable_xid, *xmin))
286 akapila@postgresql.o 1475 :GNC 64 : *xmin = nonremovable_xid;
1476 : : }
1477 : :
1478 : : /*
1479 : : * Acquire the replication slot used to retain information for conflict
1480 : : * detection, if it exists.
1481 : : *
1482 : : * Return true if successfully acquired, otherwise return false.
1483 : : */
1484 : : static bool
1485 : 513 : acquire_conflict_slot_if_exists(void)
1486 : : {
1487 [ + + ]: 513 : if (!SearchNamedReplicationSlot(CONFLICT_DETECTION_SLOT, true))
1488 : 512 : return false;
1489 : :
1490 : 1 : ReplicationSlotAcquire(CONFLICT_DETECTION_SLOT, true, false);
1491 : 1 : return true;
1492 : : }
1493 : :
1494 : : /*
1495 : : * Update the xmin the replication slot used to retain information required
1496 : : * for conflict detection.
1497 : : */
1498 : : static void
245 1499 : 64 : update_conflict_slot_xmin(TransactionId new_xmin)
1500 : : {
286 1501 [ - + ]: 64 : Assert(MyReplicationSlot);
245 1502 [ + - - + ]: 64 : Assert(!TransactionIdIsValid(new_xmin) ||
1503 : : TransactionIdPrecedesOrEquals(MyReplicationSlot->data.xmin, new_xmin));
1504 : :
1505 : : /* Return if the xmin value of the slot cannot be updated */
286 1506 [ + + ]: 64 : if (TransactionIdEquals(MyReplicationSlot->data.xmin, new_xmin))
1507 : 51 : return;
1508 : :
1509 : 13 : SpinLockAcquire(&MyReplicationSlot->mutex);
1510 : 13 : MyReplicationSlot->effective_xmin = new_xmin;
1511 : 13 : MyReplicationSlot->data.xmin = new_xmin;
1512 : 13 : SpinLockRelease(&MyReplicationSlot->mutex);
1513 : :
1514 [ - + ]: 13 : elog(DEBUG1, "updated xmin: %u", MyReplicationSlot->data.xmin);
1515 : :
1516 : 13 : ReplicationSlotMarkDirty();
1517 : 13 : ReplicationSlotsComputeRequiredXmin(false);
1518 : :
1519 : : /*
1520 : : * Like PhysicalConfirmReceivedLocation(), do not save slot information
1521 : : * each time. This is acceptable because all concurrent transactions on
1522 : : * the publisher that require the data preceding the slot's xmin should
1523 : : * have already been applied and flushed on the subscriber before the xmin
1524 : : * is advanced. So, even if the slot's xmin regresses after a restart, it
1525 : : * will be advanced again in the next cycle. Therefore, no data required
1526 : : * for conflict detection will be prematurely removed.
1527 : : */
1528 : 13 : return;
1529 : : }
1530 : :
1531 : : /*
1532 : : * Initialize the xmin for the conflict detection slot.
1533 : : */
1534 : : static void
245 1535 : 4 : init_conflict_slot_xmin(void)
1536 : : {
1537 : : TransactionId xmin_horizon;
1538 : :
1539 : : /* Replication slot must exist but shouldn't be initialized. */
1540 [ + - - + ]: 4 : Assert(MyReplicationSlot &&
1541 : : !TransactionIdIsValid(MyReplicationSlot->data.xmin));
1542 : :
126 msawada@postgresql.o 1543 : 4 : LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
286 akapila@postgresql.o 1544 : 4 : LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
1545 : :
1546 : 4 : xmin_horizon = GetOldestSafeDecodingTransactionId(false);
1547 : :
1548 : 4 : SpinLockAcquire(&MyReplicationSlot->mutex);
1549 : 4 : MyReplicationSlot->effective_xmin = xmin_horizon;
1550 : 4 : MyReplicationSlot->data.xmin = xmin_horizon;
1551 : 4 : SpinLockRelease(&MyReplicationSlot->mutex);
1552 : :
1553 : 4 : ReplicationSlotsComputeRequiredXmin(true);
1554 : :
1555 : 4 : LWLockRelease(ProcArrayLock);
126 msawada@postgresql.o 1556 : 4 : LWLockRelease(ReplicationSlotControlLock);
1557 : :
1558 : : /* Write this slot to disk */
286 akapila@postgresql.o 1559 : 4 : ReplicationSlotMarkDirty();
1560 : 4 : ReplicationSlotSave();
1561 : 4 : }
1562 : :
1563 : : /*
1564 : : * Create and acquire the replication slot used to retain information for
1565 : : * conflict detection, if not yet.
1566 : : */
1567 : : void
245 1568 : 70 : CreateConflictDetectionSlot(void)
1569 : : {
1570 : : /* Exit early, if the replication slot is already created and acquired */
1571 [ + + ]: 70 : if (MyReplicationSlot)
1572 : 66 : return;
1573 : :
1574 [ + - ]: 4 : ereport(LOG,
1575 : : errmsg("creating replication conflict detection slot"));
1576 : :
1577 : 4 : ReplicationSlotCreate(CONFLICT_DETECTION_SLOT, false, RS_PERSISTENT, false,
1578 : : false, false, false);
1579 : :
1580 : 4 : init_conflict_slot_xmin();
1581 : : }
1582 : :
1583 : : /*
1584 : : * Is current process the logical replication launcher?
1585 : : */
1586 : : bool
3253 andres@anarazel.de 1587 :CBC 2729 : IsLogicalLauncher(void)
1588 : : {
1589 : 2729 : return LogicalRepCtx->launcher_pid == MyProcPid;
1590 : : }
1591 : :
1592 : : /*
1593 : : * Return the pid of the leader apply worker if the given pid is the pid of a
1594 : : * parallel apply worker, otherwise, return InvalidPid.
1595 : : */
1596 : : pid_t
1203 akapila@postgresql.o 1597 : 864 : GetLeaderApplyWorkerPid(pid_t pid)
1598 : : {
1599 : 864 : int leader_pid = InvalidPid;
1600 : : int i;
1601 : :
1602 : 864 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
1603 : :
1604 [ + + ]: 4320 : for (i = 0; i < max_logical_replication_workers; i++)
1605 : : {
1606 : 3456 : LogicalRepWorker *w = &LogicalRepCtx->workers[i];
1607 : :
1608 [ + + - + : 3456 : if (isParallelApplyWorker(w) && w->proc && pid == w->proc->pid)
- - - - ]
1609 : : {
1203 akapila@postgresql.o 1610 :UBC 0 : leader_pid = w->leader_pid;
1611 : 0 : break;
1612 : : }
1613 : : }
1614 : :
1203 akapila@postgresql.o 1615 :CBC 864 : LWLockRelease(LogicalRepWorkerLock);
1616 : :
1617 : 864 : return leader_pid;
1618 : : }
1619 : :
1620 : : /*
1621 : : * Returns state of the subscriptions.
1622 : : */
1623 : : Datum
3393 peter_e@gmx.net 1624 : 1 : pg_stat_get_subscription(PG_FUNCTION_ARGS)
1625 : : {
1626 : : #define PG_STAT_GET_SUBSCRIPTION_COLS 10
1627 [ - + ]: 1 : Oid subid = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0);
1628 : : int i;
1629 : 1 : ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1630 : :
1295 michael@paquier.xyz 1631 : 1 : InitMaterializedSRF(fcinfo, 0);
1632 : :
1633 : : /* Make sure we get consistent view of the workers. */
3393 peter_e@gmx.net 1634 : 1 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
1635 : :
1428 tgl@sss.pgh.pa.us 1636 [ + + ]: 5 : for (i = 0; i < max_logical_replication_workers; i++)
1637 : : {
1638 : : /* for each row */
1389 peter@eisentraut.org 1639 : 4 : Datum values[PG_STAT_GET_SUBSCRIPTION_COLS] = {0};
1640 : 4 : bool nulls[PG_STAT_GET_SUBSCRIPTION_COLS] = {0};
1641 : : int worker_pid;
1642 : : LogicalRepWorker worker;
1643 : :
3393 peter_e@gmx.net 1644 : 4 : memcpy(&worker, &LogicalRepCtx->workers[i],
1645 : : sizeof(LogicalRepWorker));
1646 [ + + - + ]: 4 : if (!worker.proc || !IsBackendPid(worker.proc->pid))
1647 : 2 : continue;
1648 : :
1649 [ - + - - ]: 2 : if (OidIsValid(subid) && worker.subid != subid)
3393 peter_e@gmx.net 1650 :UBC 0 : continue;
1651 : :
3393 peter_e@gmx.net 1652 :CBC 2 : worker_pid = worker.proc->pid;
1653 : :
1654 : 2 : values[0] = ObjectIdGetDatum(worker.subid);
181 akapila@postgresql.o 1655 [ + - - + ]:GNC 2 : if (isTableSyncWorker(&worker))
3330 peter_e@gmx.net 1656 :UBC 0 : values[1] = ObjectIdGetDatum(worker.relid);
1657 : : else
3330 peter_e@gmx.net 1658 :CBC 2 : nulls[1] = true;
1659 : 2 : values[2] = Int32GetDatum(worker_pid);
1660 : :
1203 akapila@postgresql.o 1661 [ + - - + ]: 2 : if (isParallelApplyWorker(&worker))
1203 akapila@postgresql.o 1662 :UBC 0 : values[3] = Int32GetDatum(worker.leader_pid);
1663 : : else
3330 peter_e@gmx.net 1664 :CBC 2 : nulls[3] = true;
1665 : :
180 alvherre@kurilemu.de 1666 [ - + ]:GNC 2 : if (!XLogRecPtrIsValid(worker.last_lsn))
1203 akapila@postgresql.o 1667 :UBC 0 : nulls[4] = true;
1668 : : else
1203 akapila@postgresql.o 1669 :CBC 2 : values[4] = LSNGetDatum(worker.last_lsn);
3393 peter_e@gmx.net 1670 [ - + ]: 2 : if (worker.last_send_time == 0)
1203 akapila@postgresql.o 1671 :UBC 0 : nulls[5] = true;
1672 : : else
1203 akapila@postgresql.o 1673 :CBC 2 : values[5] = TimestampTzGetDatum(worker.last_send_time);
3393 peter_e@gmx.net 1674 [ - + ]: 2 : if (worker.last_recv_time == 0)
1203 akapila@postgresql.o 1675 :UBC 0 : nulls[6] = true;
1676 : : else
1203 akapila@postgresql.o 1677 :CBC 2 : values[6] = TimestampTzGetDatum(worker.last_recv_time);
180 alvherre@kurilemu.de 1678 [ - + ]:GNC 2 : if (!XLogRecPtrIsValid(worker.reply_lsn))
1203 akapila@postgresql.o 1679 :UBC 0 : nulls[7] = true;
1680 : : else
1203 akapila@postgresql.o 1681 :CBC 2 : values[7] = LSNGetDatum(worker.reply_lsn);
3393 peter_e@gmx.net 1682 [ - + ]: 2 : if (worker.reply_time == 0)
1203 akapila@postgresql.o 1683 :UBC 0 : nulls[8] = true;
1684 : : else
1203 akapila@postgresql.o 1685 :CBC 2 : values[8] = TimestampTzGetDatum(worker.reply_time);
1686 : :
953 nathan@postgresql.or 1687 [ + - - - : 2 : switch (worker.type)
- - ]
1688 : : {
1689 : 2 : case WORKERTYPE_APPLY:
1690 : 2 : values[9] = CStringGetTextDatum("apply");
1691 : 2 : break;
953 nathan@postgresql.or 1692 :UBC 0 : case WORKERTYPE_PARALLEL_APPLY:
1693 : 0 : values[9] = CStringGetTextDatum("parallel apply");
1694 : 0 : break;
181 akapila@postgresql.o 1695 :UNC 0 : case WORKERTYPE_SEQUENCESYNC:
1696 : 0 : values[9] = CStringGetTextDatum("sequence synchronization");
1697 : 0 : break;
953 nathan@postgresql.or 1698 :UBC 0 : case WORKERTYPE_TABLESYNC:
1699 : 0 : values[9] = CStringGetTextDatum("table synchronization");
1700 : 0 : break;
1701 : 0 : case WORKERTYPE_UNKNOWN:
1702 : : /* Should never happen. */
1703 [ # # ]: 0 : elog(ERROR, "unknown worker type");
1704 : : }
1705 : :
1520 michael@paquier.xyz 1706 :CBC 2 : tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
1707 : : values, nulls);
1708 : :
1709 : : /*
1710 : : * If only a single subscription was requested, and we found it,
1711 : : * break.
1712 : : */
3393 peter_e@gmx.net 1713 [ - + ]: 2 : if (OidIsValid(subid))
3393 peter_e@gmx.net 1714 :UBC 0 : break;
1715 : : }
1716 : :
3393 peter_e@gmx.net 1717 :CBC 1 : LWLockRelease(LogicalRepWorkerLock);
1718 : :
1719 : 1 : return (Datum) 0;
1720 : : }
|