Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : * launcher.c
3 : : * PostgreSQL logical replication worker launcher process
4 : : *
5 : : * Copyright (c) 2016-2025, PostgreSQL Global Development Group
6 : : *
7 : : * IDENTIFICATION
8 : : * src/backend/replication/logical/launcher.c
9 : : *
10 : : * NOTES
11 : : * This module contains the logical replication worker launcher which
12 : : * uses the background worker infrastructure to start the logical
13 : : * replication workers for every enabled subscription.
14 : : *
15 : : *-------------------------------------------------------------------------
16 : : */
17 : :
18 : : #include "postgres.h"
19 : :
20 : : #include "access/heapam.h"
21 : : #include "access/htup.h"
22 : : #include "access/htup_details.h"
23 : : #include "access/tableam.h"
24 : : #include "access/xact.h"
25 : : #include "catalog/pg_subscription.h"
26 : : #include "catalog/pg_subscription_rel.h"
27 : : #include "funcapi.h"
28 : : #include "lib/dshash.h"
29 : : #include "miscadmin.h"
30 : : #include "pgstat.h"
31 : : #include "postmaster/bgworker.h"
32 : : #include "postmaster/interrupt.h"
33 : : #include "replication/logicallauncher.h"
34 : : #include "replication/origin.h"
35 : : #include "replication/slot.h"
36 : : #include "replication/walreceiver.h"
37 : : #include "replication/worker_internal.h"
38 : : #include "storage/ipc.h"
39 : : #include "storage/proc.h"
40 : : #include "storage/procarray.h"
41 : : #include "tcop/tcopprot.h"
42 : : #include "utils/builtins.h"
43 : : #include "utils/memutils.h"
44 : : #include "utils/pg_lsn.h"
45 : : #include "utils/snapmgr.h"
46 : : #include "utils/syscache.h"
47 : :
48 : : /* max sleep time between cycles (3min) */
49 : : #define DEFAULT_NAPTIME_PER_CYCLE 180000L
50 : :
51 : : /* GUC variables */
52 : : int max_logical_replication_workers = 4;
53 : : int max_sync_workers_per_subscription = 2;
54 : : int max_parallel_apply_workers_per_subscription = 2;
55 : :
56 : : LogicalRepWorker *MyLogicalRepWorker = NULL;
57 : :
58 : : typedef struct LogicalRepCtxStruct
59 : : {
60 : : /* Supervisor process. */
61 : : pid_t launcher_pid;
62 : :
63 : : /* Hash table holding last start times of subscriptions' apply workers. */
64 : : dsa_handle last_start_dsa;
65 : : dshash_table_handle last_start_dsh;
66 : :
67 : : /* Background workers. */
68 : : LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER];
69 : : } LogicalRepCtxStruct;
70 : :
71 : : static LogicalRepCtxStruct *LogicalRepCtx;
72 : :
73 : : /* an entry in the last-start-times shared hash table */
74 : : typedef struct LauncherLastStartTimesEntry
75 : : {
76 : : Oid subid; /* OID of logrep subscription (hash key) */
77 : : TimestampTz last_start_time; /* last time its apply worker was started */
78 : : } LauncherLastStartTimesEntry;
79 : :
80 : : /* parameters for the last-start-times shared hash table */
81 : : static const dshash_parameters dsh_params = {
82 : : sizeof(Oid),
83 : : sizeof(LauncherLastStartTimesEntry),
84 : : dshash_memcmp,
85 : : dshash_memhash,
86 : : dshash_memcpy,
87 : : LWTRANCHE_LAUNCHER_HASH
88 : : };
89 : :
90 : : static dsa_area *last_start_times_dsa = NULL;
91 : : static dshash_table *last_start_times = NULL;
92 : :
93 : : static bool on_commit_launcher_wakeup = false;
94 : :
95 : :
96 : : static void logicalrep_launcher_onexit(int code, Datum arg);
97 : : static void logicalrep_worker_onexit(int code, Datum arg);
98 : : static void logicalrep_worker_detach(void);
99 : : static void logicalrep_worker_cleanup(LogicalRepWorker *worker);
100 : : static int logicalrep_pa_worker_count(Oid subid);
101 : : static void logicalrep_launcher_attach_dshmem(void);
102 : : static void ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time);
103 : : static TimestampTz ApplyLauncherGetWorkerStartTime(Oid subid);
104 : : static void compute_min_nonremovable_xid(LogicalRepWorker *worker, TransactionId *xmin);
105 : : static bool acquire_conflict_slot_if_exists(void);
106 : : static void update_conflict_slot_xmin(TransactionId new_xmin);
107 : : static void init_conflict_slot_xmin(void);
108 : :
109 : :
110 : : /*
111 : : * Load the list of subscriptions.
112 : : *
113 : : * Only the fields interesting for worker start/stop functions are filled for
114 : : * each subscription.
115 : : */
116 : : static List *
3254 peter_e@gmx.net 117 :CBC 2789 : get_subscription_list(void)
118 : : {
119 : 2789 : List *res = NIL;
120 : : Relation rel;
121 : : TableScanDesc scan;
122 : : HeapTuple tup;
123 : : MemoryContext resultcxt;
124 : :
125 : : /* This is the context that we will allocate our output data in */
126 : 2789 : resultcxt = CurrentMemoryContext;
127 : :
128 : : /*
129 : : * Start a transaction so we can access pg_subscription.
130 : : */
131 : 2789 : StartTransactionCommand();
132 : :
2522 andres@anarazel.de 133 : 2789 : rel = table_open(SubscriptionRelationId, AccessShareLock);
2473 134 : 2789 : scan = table_beginscan_catalog(rel, 0, NULL);
135 : :
3254 peter_e@gmx.net 136 [ + + ]: 3617 : while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
137 : : {
138 : 828 : Form_pg_subscription subform = (Form_pg_subscription) GETSTRUCT(tup);
139 : : Subscription *sub;
140 : : MemoryContext oldcxt;
141 : :
142 : : /*
143 : : * Allocate our results in the caller's context, not the
144 : : * transaction's. We do this inside the loop, and restore the original
145 : : * context at the end, so that leaky things like heap_getnext() are
146 : : * not called in a potentially long-lived context.
147 : : */
148 : 828 : oldcxt = MemoryContextSwitchTo(resultcxt);
149 : :
7 michael@paquier.xyz 150 :GNC 828 : sub = palloc0_object(Subscription);
2584 andres@anarazel.de 151 :CBC 828 : sub->oid = subform->oid;
3254 peter_e@gmx.net 152 : 828 : sub->dbid = subform->subdbid;
153 : 828 : sub->owner = subform->subowner;
154 : 828 : sub->enabled = subform->subenabled;
155 : 828 : sub->name = pstrdup(NameStr(subform->subname));
147 akapila@postgresql.o 156 :GNC 828 : sub->retaindeadtuples = subform->subretaindeadtuples;
106 157 : 828 : sub->retentionactive = subform->subretentionactive;
158 : : /* We don't fill fields we are not interested in. */
159 : :
3254 peter_e@gmx.net 160 :CBC 828 : res = lappend(res, sub);
161 : 828 : MemoryContextSwitchTo(oldcxt);
162 : : }
163 : :
2473 andres@anarazel.de 164 : 2789 : table_endscan(scan);
2522 165 : 2789 : table_close(rel, AccessShareLock);
166 : :
3254 peter_e@gmx.net 167 : 2789 : CommitTransactionCommand();
168 : :
169 : 2789 : return res;
170 : : }
171 : :
172 : : /*
173 : : * Wait for a background worker to start up and attach to the shmem context.
174 : : *
175 : : * This is only needed for cleaning up the shared memory in case the worker
176 : : * fails to attach.
177 : : *
178 : : * Returns whether the attach was successful.
179 : : */
180 : : static bool
181 : 418 : WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
182 : : uint16 generation,
183 : : BackgroundWorkerHandle *handle)
184 : : {
176 tgl@sss.pgh.pa.us 185 : 418 : bool result = false;
186 : 418 : bool dropped_latch = false;
187 : :
188 : : for (;;)
3254 peter_e@gmx.net 189 : 1144 : {
190 : : BgwHandleStatus status;
191 : : pid_t pid;
192 : : int rc;
193 : :
194 [ - + ]: 1562 : CHECK_FOR_INTERRUPTS();
195 : :
3157 196 : 1562 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
197 : :
198 : : /* Worker either died or has started. Return false if died. */
199 [ + + + + ]: 1562 : if (!worker->in_use || worker->proc)
200 : : {
176 tgl@sss.pgh.pa.us 201 : 415 : result = worker->in_use;
3157 peter_e@gmx.net 202 : 415 : LWLockRelease(LogicalRepWorkerLock);
176 tgl@sss.pgh.pa.us 203 : 415 : break;
204 : : }
205 : :
3157 peter_e@gmx.net 206 : 1147 : LWLockRelease(LogicalRepWorkerLock);
207 : :
208 : : /* Check if worker has died before attaching, and clean up after it. */
3254 209 : 1147 : status = GetBackgroundWorkerPid(handle, &pid);
210 : :
211 [ - + ]: 1147 : if (status == BGWH_STOPPED)
212 : : {
3157 peter_e@gmx.net 213 :UBC 0 : LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
214 : : /* Ensure that this was indeed the worker we waited for. */
215 [ # # ]: 0 : if (generation == worker->generation)
216 : 0 : logicalrep_worker_cleanup(worker);
217 : 0 : LWLockRelease(LogicalRepWorkerLock);
176 tgl@sss.pgh.pa.us 218 : 0 : break; /* result is already false */
219 : : }
220 : :
221 : : /*
222 : : * We need timeout because we generally don't get notified via latch
223 : : * about the worker attach. But we don't expect to have to wait long.
224 : : */
3254 peter_e@gmx.net 225 :CBC 1147 : rc = WaitLatch(MyLatch,
226 : : WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
227 : : 10L, WAIT_EVENT_BGWORKER_STARTUP);
228 : :
3116 andres@anarazel.de 229 [ + + ]: 1147 : if (rc & WL_LATCH_SET)
230 : : {
231 : 478 : ResetLatch(MyLatch);
232 [ + + ]: 478 : CHECK_FOR_INTERRUPTS();
176 tgl@sss.pgh.pa.us 233 : 475 : dropped_latch = true;
234 : : }
235 : : }
236 : :
237 : : /*
238 : : * If we had to clear a latch event in order to wait, be sure to restore
239 : : * it before exiting. Otherwise caller may miss events.
240 : : */
241 [ + - ]: 415 : if (dropped_latch)
242 : 415 : SetLatch(MyLatch);
243 : :
244 : 415 : return result;
245 : : }
246 : :
247 : : /*
248 : : * Walks the workers array and searches for one that matches given worker type,
249 : : * subscription id, and relation id.
250 : : *
251 : : * For both apply workers and sequencesync workers, the relid should be set to
252 : : * InvalidOid, as these workers handle changes across all tables and sequences
253 : : * respectively, rather than targeting a specific relation. For tablesync
254 : : * workers, the relid should be set to the OID of the relation being
255 : : * synchronized.
256 : : */
257 : : LogicalRepWorker *
50 akapila@postgresql.o 258 :GNC 3153 : logicalrep_worker_find(LogicalRepWorkerType wtype, Oid subid, Oid relid,
259 : : bool only_running)
260 : : {
261 : : int i;
3136 bruce@momjian.us 262 :CBC 3153 : LogicalRepWorker *res = NULL;
263 : :
264 : : /* relid must be valid only for table sync workers */
50 akapila@postgresql.o 265 [ - + ]:GNC 3153 : Assert((wtype == WORKERTYPE_TABLESYNC) == OidIsValid(relid));
3254 peter_e@gmx.net 266 [ - + ]:CBC 3153 : Assert(LWLockHeldByMe(LogicalRepWorkerLock));
267 : :
268 : : /* Search for an attached worker that matches the specified criteria. */
269 [ + + ]: 9680 : for (i = 0; i < max_logical_replication_workers; i++)
270 : : {
3136 bruce@momjian.us 271 : 8462 : LogicalRepWorker *w = &LogicalRepCtx->workers[i];
272 : :
273 : : /* Skip parallel apply workers. */
1073 akapila@postgresql.o 274 [ + + - + ]: 8462 : if (isParallelApplyWorker(w))
1073 akapila@postgresql.o 275 :UBC 0 : continue;
276 : :
3157 peter_e@gmx.net 277 [ + + + + :CBC 8462 : if (w->in_use && w->subid == subid && w->relid == relid &&
+ + ]
50 akapila@postgresql.o 278 [ + + + + :GNC 1963 : w->type == wtype && (!only_running || w->proc))
+ - ]
279 : : {
3254 peter_e@gmx.net 280 :CBC 1935 : res = w;
281 : 1935 : break;
282 : : }
283 : : }
284 : :
285 : 3153 : return res;
286 : : }
287 : :
288 : : /*
289 : : * Similar to logicalrep_worker_find(), but returns a list of all workers for
290 : : * the subscription, instead of just one.
291 : : */
292 : : List *
511 akapila@postgresql.o 293 : 663 : logicalrep_workers_find(Oid subid, bool only_running, bool acquire_lock)
294 : : {
295 : : int i;
3057 peter_e@gmx.net 296 : 663 : List *res = NIL;
297 : :
511 akapila@postgresql.o 298 [ + + ]: 663 : if (acquire_lock)
299 : 120 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
300 : :
3057 peter_e@gmx.net 301 [ - + ]: 663 : Assert(LWLockHeldByMe(LogicalRepWorkerLock));
302 : :
303 : : /* Search for attached worker for a given subscription id. */
304 [ + + ]: 3443 : for (i = 0; i < max_logical_replication_workers; i++)
305 : : {
306 : 2780 : LogicalRepWorker *w = &LogicalRepCtx->workers[i];
307 : :
308 [ + + + + : 2780 : if (w->in_use && w->subid == subid && (!only_running || w->proc))
+ + + + ]
309 : 480 : res = lappend(res, w);
310 : : }
311 : :
511 akapila@postgresql.o 312 [ + + ]: 663 : if (acquire_lock)
313 : 120 : LWLockRelease(LogicalRepWorkerLock);
314 : :
3057 peter_e@gmx.net 315 : 663 : return res;
316 : : }
317 : :
318 : : /*
319 : : * Start new logical replication background worker, if possible.
320 : : *
321 : : * Returns true on success, false on failure.
322 : : */
323 : : bool
856 akapila@postgresql.o 324 : 418 : logicalrep_worker_launch(LogicalRepWorkerType wtype,
325 : : Oid dbid, Oid subid, const char *subname, Oid userid,
326 : : Oid relid, dsm_handle subworker_dsm,
327 : : bool retain_dead_tuples)
328 : : {
329 : : BackgroundWorker bgw;
330 : : BackgroundWorkerHandle *bgw_handle;
331 : : uint16 generation;
332 : : int i;
3136 bruce@momjian.us 333 : 418 : int slot = 0;
334 : 418 : LogicalRepWorker *worker = NULL;
335 : : int nsyncworkers;
336 : : int nparallelapplyworkers;
337 : : TimestampTz now;
856 akapila@postgresql.o 338 : 418 : bool is_tablesync_worker = (wtype == WORKERTYPE_TABLESYNC);
42 akapila@postgresql.o 339 :GNC 418 : bool is_sequencesync_worker = (wtype == WORKERTYPE_SEQUENCESYNC);
856 akapila@postgresql.o 340 :CBC 418 : bool is_parallel_apply_worker = (wtype == WORKERTYPE_PARALLEL_APPLY);
341 : :
342 : : /*----------
343 : : * Sanity checks:
344 : : * - must be valid worker type
345 : : * - tablesync workers are only ones to have relid
346 : : * - parallel apply worker is the only kind of subworker
347 : : * - The replication slot used in conflict detection is created when
348 : : * retain_dead_tuples is enabled
349 : : */
350 [ - + ]: 418 : Assert(wtype != WORKERTYPE_UNKNOWN);
351 [ - + ]: 418 : Assert(is_tablesync_worker == OidIsValid(relid));
352 [ - + ]: 418 : Assert(is_parallel_apply_worker == (subworker_dsm != DSM_HANDLE_INVALID));
147 akapila@postgresql.o 353 [ + + - + ]:GNC 418 : Assert(!retain_dead_tuples || MyReplicationSlot);
354 : :
3129 peter_e@gmx.net 355 [ + + ]:CBC 418 : ereport(DEBUG1,
356 : : (errmsg_internal("starting logical replication worker for subscription \"%s\"",
357 : : subname)));
358 : :
359 : : /* Report this after the initial starting message for consistency. */
271 msawada@postgresql.o 360 [ - + ]: 418 : if (max_active_replication_origins == 0)
3254 peter_e@gmx.net 361 [ # # ]:UBC 0 : ereport(ERROR,
362 : : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
363 : : errmsg("cannot start logical replication workers when \"max_active_replication_origins\" is 0")));
364 : :
365 : : /*
366 : : * We need to do the modification of the shared memory under lock so that
367 : : * we have consistent view.
368 : : */
3254 peter_e@gmx.net 369 :CBC 418 : LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
370 : :
3157 371 : 418 : retry:
372 : : /* Find unused worker slot. */
373 [ + - ]: 736 : for (i = 0; i < max_logical_replication_workers; i++)
374 : : {
375 : 736 : LogicalRepWorker *w = &LogicalRepCtx->workers[i];
376 : :
377 [ + + ]: 736 : if (!w->in_use)
378 : : {
379 : 418 : worker = w;
380 : 418 : slot = i;
3254 381 : 418 : break;
382 : : }
383 : : }
384 : :
3157 385 : 418 : nsyncworkers = logicalrep_sync_worker_count(subid);
386 : :
387 : 418 : now = GetCurrentTimestamp();
388 : :
389 : : /*
390 : : * If we didn't find a free slot, try to do garbage collection. The
391 : : * reason we do this is because if some worker failed to start up and its
392 : : * parent has crashed while waiting, the in_use state was never cleared.
393 : : */
394 [ + - - + ]: 418 : if (worker == NULL || nsyncworkers >= max_sync_workers_per_subscription)
395 : : {
3136 bruce@momjian.us 396 :UBC 0 : bool did_cleanup = false;
397 : :
3157 peter_e@gmx.net 398 [ # # ]: 0 : for (i = 0; i < max_logical_replication_workers; i++)
399 : : {
400 : 0 : LogicalRepWorker *w = &LogicalRepCtx->workers[i];
401 : :
402 : : /*
403 : : * If the worker was marked in use but didn't manage to attach in
404 : : * time, clean it up.
405 : : */
406 [ # # # # : 0 : if (w->in_use && !w->proc &&
# # ]
407 : 0 : TimestampDifferenceExceeds(w->launch_time, now,
408 : : wal_receiver_timeout))
409 : : {
410 [ # # ]: 0 : elog(WARNING,
411 : : "logical replication worker for subscription %u took too long to start; canceled",
412 : : w->subid);
413 : :
414 : 0 : logicalrep_worker_cleanup(w);
415 : 0 : did_cleanup = true;
416 : : }
417 : : }
418 : :
419 [ # # ]: 0 : if (did_cleanup)
420 : 0 : goto retry;
421 : : }
422 : :
423 : : /*
424 : : * We don't allow to invoke more sync workers once we have reached the
425 : : * sync worker limit per subscription. So, just return silently as we
426 : : * might get here because of an otherwise harmless race condition.
427 : : */
42 akapila@postgresql.o 428 [ + + + + ]:GNC 418 : if ((is_tablesync_worker || is_sequencesync_worker) &&
429 [ - + ]: 207 : nsyncworkers >= max_sync_workers_per_subscription)
430 : : {
3157 peter_e@gmx.net 431 :UBC 0 : LWLockRelease(LogicalRepWorkerLock);
1073 akapila@postgresql.o 432 : 0 : return false;
433 : : }
434 : :
1073 akapila@postgresql.o 435 :CBC 418 : nparallelapplyworkers = logicalrep_pa_worker_count(subid);
436 : :
437 : : /*
438 : : * Return false if the number of parallel apply workers reached the limit
439 : : * per subscription.
440 : : */
441 [ + + ]: 418 : if (is_parallel_apply_worker &&
442 [ - + ]: 10 : nparallelapplyworkers >= max_parallel_apply_workers_per_subscription)
443 : : {
1073 akapila@postgresql.o 444 :UBC 0 : LWLockRelease(LogicalRepWorkerLock);
445 : 0 : return false;
446 : : }
447 : :
448 : : /*
449 : : * However if there are no more free worker slots, inform user about it
450 : : * before exiting.
451 : : */
3254 peter_e@gmx.net 452 [ - + ]:CBC 418 : if (worker == NULL)
453 : : {
3249 fujii@postgresql.org 454 :UBC 0 : LWLockRelease(LogicalRepWorkerLock);
3254 peter_e@gmx.net 455 [ # # ]: 0 : ereport(WARNING,
456 : : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
457 : : errmsg("out of logical replication worker slots"),
458 : : errhint("You might need to increase \"%s\".", "max_logical_replication_workers")));
1073 akapila@postgresql.o 459 : 0 : return false;
460 : : }
461 : :
462 : : /* Prepare the worker slot. */
856 akapila@postgresql.o 463 :CBC 418 : worker->type = wtype;
3157 peter_e@gmx.net 464 : 418 : worker->launch_time = now;
465 : 418 : worker->in_use = true;
466 : 418 : worker->generation++;
3191 467 : 418 : worker->proc = NULL;
3254 468 : 418 : worker->dbid = dbid;
469 : 418 : worker->userid = userid;
470 : 418 : worker->subid = subid;
3191 471 : 418 : worker->relid = relid;
472 : 418 : worker->relstate = SUBREL_STATE_UNKNOWN;
473 : 418 : worker->relstate_lsn = InvalidXLogRecPtr;
1567 akapila@postgresql.o 474 : 418 : worker->stream_fileset = NULL;
1064 475 [ + + ]: 418 : worker->leader_pid = is_parallel_apply_worker ? MyProcPid : InvalidPid;
1073 476 : 418 : worker->parallel_apply = is_parallel_apply_worker;
147 akapila@postgresql.o 477 :GNC 418 : worker->oldest_nonremovable_xid = retain_dead_tuples
478 : 1 : ? MyReplicationSlot->data.xmin
479 [ + + ]: 418 : : InvalidTransactionId;
3191 peter_e@gmx.net 480 :CBC 418 : worker->last_lsn = InvalidXLogRecPtr;
481 : 418 : TIMESTAMP_NOBEGIN(worker->last_send_time);
482 : 418 : TIMESTAMP_NOBEGIN(worker->last_recv_time);
483 : 418 : worker->reply_lsn = InvalidXLogRecPtr;
484 : 418 : TIMESTAMP_NOBEGIN(worker->reply_time);
42 akapila@postgresql.o 485 :GNC 418 : worker->last_seqsync_start_time = 0;
486 : :
487 : : /* Before releasing lock, remember generation for future identification. */
3012 tgl@sss.pgh.pa.us 488 :CBC 418 : generation = worker->generation;
489 : :
3254 peter_e@gmx.net 490 : 418 : LWLockRelease(LogicalRepWorkerLock);
491 : :
492 : : /* Register the new dynamic worker. */
3167 tgl@sss.pgh.pa.us 493 : 418 : memset(&bgw, 0, sizeof(bgw));
3136 bruce@momjian.us 494 : 418 : bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
495 : : BGWORKER_BACKEND_DATABASE_CONNECTION;
3254 peter_e@gmx.net 496 : 418 : bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
898 nathan@postgresql.or 497 : 418 : snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres");
498 : :
848 akapila@postgresql.o 499 [ + + + + : 418 : switch (worker->type)
- - ]
500 : : {
501 : 201 : case WORKERTYPE_APPLY:
502 : 201 : snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain");
503 : 201 : snprintf(bgw.bgw_name, BGW_MAXLEN,
504 : : "logical replication apply worker for subscription %u",
505 : : subid);
506 : 201 : snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication apply worker");
507 : 201 : break;
508 : :
509 : 10 : case WORKERTYPE_PARALLEL_APPLY:
510 : 10 : snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ParallelApplyWorkerMain");
511 : 10 : snprintf(bgw.bgw_name, BGW_MAXLEN,
512 : : "logical replication parallel apply worker for subscription %u",
513 : : subid);
514 : 10 : snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication parallel worker");
515 : :
516 : 10 : memcpy(bgw.bgw_extra, &subworker_dsm, sizeof(dsm_handle));
517 : 10 : break;
518 : :
42 akapila@postgresql.o 519 :GNC 9 : case WORKERTYPE_SEQUENCESYNC:
520 : 9 : snprintf(bgw.bgw_function_name, BGW_MAXLEN, "SequenceSyncWorkerMain");
521 : 9 : snprintf(bgw.bgw_name, BGW_MAXLEN,
522 : : "logical replication sequencesync worker for subscription %u",
523 : : subid);
524 : 9 : snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication sequencesync worker");
525 : 9 : break;
526 : :
848 akapila@postgresql.o 527 :CBC 198 : case WORKERTYPE_TABLESYNC:
42 akapila@postgresql.o 528 :GNC 198 : snprintf(bgw.bgw_function_name, BGW_MAXLEN, "TableSyncWorkerMain");
848 akapila@postgresql.o 529 :CBC 198 : snprintf(bgw.bgw_name, BGW_MAXLEN,
530 : : "logical replication tablesync worker for subscription %u sync %u",
531 : : subid,
532 : : relid);
533 : 198 : snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication tablesync worker");
534 : 198 : break;
535 : :
848 akapila@postgresql.o 536 :UBC 0 : case WORKERTYPE_UNKNOWN:
537 : : /* Should never happen. */
538 [ # # ]: 0 : elog(ERROR, "unknown worker type");
539 : : }
540 : :
3254 peter_e@gmx.net 541 :CBC 418 : bgw.bgw_restart_time = BGW_NEVER_RESTART;
542 : 418 : bgw.bgw_notify_pid = MyProcPid;
3164 fujii@postgresql.org 543 : 418 : bgw.bgw_main_arg = Int32GetDatum(slot);
544 : :
3254 peter_e@gmx.net 545 [ - + ]: 418 : if (!RegisterDynamicBackgroundWorker(&bgw, &bgw_handle))
546 : : {
547 : : /* Failed to start worker, so clean up the worker slot. */
3012 tgl@sss.pgh.pa.us 548 :UBC 0 : LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
549 [ # # ]: 0 : Assert(generation == worker->generation);
550 : 0 : logicalrep_worker_cleanup(worker);
551 : 0 : LWLockRelease(LogicalRepWorkerLock);
552 : :
3254 peter_e@gmx.net 553 [ # # ]: 0 : ereport(WARNING,
554 : : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
555 : : errmsg("out of background worker slots"),
556 : : errhint("You might need to increase \"%s\".", "max_worker_processes")));
1073 akapila@postgresql.o 557 : 0 : return false;
558 : : }
559 : :
560 : : /* Now wait until it attaches. */
1073 akapila@postgresql.o 561 :CBC 418 : return WaitForReplicationWorkerAttach(worker, generation, bgw_handle);
562 : : }
563 : :
564 : : /*
565 : : * Internal function to stop the worker and wait until it detaches from the
566 : : * slot.
567 : : */
568 : : static void
569 : 84 : logicalrep_worker_stop_internal(LogicalRepWorker *worker, int signo)
570 : : {
571 : : uint16 generation;
572 : :
573 [ - + ]: 84 : Assert(LWLockHeldByMeInMode(LogicalRepWorkerLock, LW_SHARED));
574 : :
575 : : /*
576 : : * Remember which generation was our worker so we can check if what we see
577 : : * is still the same one.
578 : : */
3157 peter_e@gmx.net 579 : 84 : generation = worker->generation;
580 : :
581 : : /*
582 : : * If we found a worker but it does not have proc set then it is still
583 : : * starting up; wait for it to finish starting and then kill it.
584 : : */
585 [ + - + + ]: 84 : while (worker->in_use && !worker->proc)
586 : : {
587 : : int rc;
588 : :
3254 589 : 1 : LWLockRelease(LogicalRepWorkerLock);
590 : :
591 : : /* Wait a bit --- we don't expect to have to wait long. */
3116 andres@anarazel.de 592 : 1 : rc = WaitLatch(MyLatch,
593 : : WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
594 : : 10L, WAIT_EVENT_BGWORKER_STARTUP);
595 : :
596 [ - + ]: 1 : if (rc & WL_LATCH_SET)
597 : : {
3116 andres@anarazel.de 598 :UBC 0 : ResetLatch(MyLatch);
599 [ # # ]: 0 : CHECK_FOR_INTERRUPTS();
600 : : }
601 : :
602 : : /* Recheck worker status. */
3254 peter_e@gmx.net 603 :CBC 1 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
604 : :
605 : : /*
606 : : * Check whether the worker slot is no longer used, which would mean
607 : : * that the worker has exited, or whether the worker generation is
608 : : * different, meaning that a different worker has taken the slot.
609 : : */
3157 610 [ + - - + ]: 1 : if (!worker->in_use || worker->generation != generation)
3250 peter_e@gmx.net 611 :UBC 0 : return;
612 : :
613 : : /* Worker has assigned proc, so it has started. */
3250 peter_e@gmx.net 614 [ + - ]:CBC 1 : if (worker->proc)
3254 615 : 1 : break;
616 : : }
617 : :
618 : : /* Now terminate the worker ... */
1073 akapila@postgresql.o 619 : 84 : kill(worker->proc->pid, signo);
620 : :
621 : : /* ... and wait for it to die. */
622 : : for (;;)
3254 peter_e@gmx.net 623 : 112 : {
624 : : int rc;
625 : :
626 : : /* is it gone? */
3157 627 [ + + + + ]: 196 : if (!worker->proc || worker->generation != generation)
628 : : break;
629 : :
3091 tgl@sss.pgh.pa.us 630 : 112 : LWLockRelease(LogicalRepWorkerLock);
631 : :
632 : : /* Wait a bit --- we don't expect to have to wait long. */
3116 andres@anarazel.de 633 : 112 : rc = WaitLatch(MyLatch,
634 : : WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
635 : : 10L, WAIT_EVENT_BGWORKER_SHUTDOWN);
636 : :
637 [ + + ]: 112 : if (rc & WL_LATCH_SET)
638 : : {
639 : 36 : ResetLatch(MyLatch);
640 [ + + ]: 36 : CHECK_FOR_INTERRUPTS();
641 : : }
642 : :
3091 tgl@sss.pgh.pa.us 643 : 112 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
644 : : }
645 : : }
646 : :
647 : : /*
648 : : * Stop the logical replication worker that matches the specified worker type,
649 : : * subscription id, and relation id.
650 : : */
651 : : void
50 akapila@postgresql.o 652 :GNC 97 : logicalrep_worker_stop(LogicalRepWorkerType wtype, Oid subid, Oid relid)
653 : : {
654 : : LogicalRepWorker *worker;
655 : :
656 : : /* relid must be valid only for table sync workers */
657 [ - + ]: 97 : Assert((wtype == WORKERTYPE_TABLESYNC) == OidIsValid(relid));
658 : :
1073 akapila@postgresql.o 659 :CBC 97 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
660 : :
50 akapila@postgresql.o 661 :GNC 97 : worker = logicalrep_worker_find(wtype, subid, relid, false);
662 : :
1073 akapila@postgresql.o 663 [ + + ]:CBC 97 : if (worker)
664 : : {
665 [ + - - + ]: 76 : Assert(!isParallelApplyWorker(worker));
666 : 76 : logicalrep_worker_stop_internal(worker, SIGTERM);
667 : : }
668 : :
669 : 97 : LWLockRelease(LogicalRepWorkerLock);
670 : 97 : }
671 : :
672 : : /*
673 : : * Stop the given logical replication parallel apply worker.
674 : : *
675 : : * Node that the function sends SIGUSR2 instead of SIGTERM to the parallel apply
676 : : * worker so that the worker exits cleanly.
677 : : */
678 : : void
953 679 : 5 : logicalrep_pa_worker_stop(ParallelApplyWorkerInfo *winfo)
680 : : {
681 : : int slot_no;
682 : : uint16 generation;
683 : : LogicalRepWorker *worker;
684 : :
685 [ - + ]: 5 : SpinLockAcquire(&winfo->shared->mutex);
686 : 5 : generation = winfo->shared->logicalrep_worker_generation;
687 : 5 : slot_no = winfo->shared->logicalrep_worker_slot_no;
688 : 5 : SpinLockRelease(&winfo->shared->mutex);
689 : :
1073 690 [ + - - + ]: 5 : Assert(slot_no >= 0 && slot_no < max_logical_replication_workers);
691 : :
692 : : /*
693 : : * Detach from the error_mq_handle for the parallel apply worker before
694 : : * stopping it. This prevents the leader apply worker from trying to
695 : : * receive the message from the error queue that might already be detached
696 : : * by the parallel apply worker.
697 : : */
953 698 [ + - ]: 5 : if (winfo->error_mq_handle)
699 : : {
700 : 5 : shm_mq_detach(winfo->error_mq_handle);
701 : 5 : winfo->error_mq_handle = NULL;
702 : : }
703 : :
1073 704 : 5 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
705 : :
706 : 5 : worker = &LogicalRepCtx->workers[slot_no];
707 [ + - - + ]: 5 : Assert(isParallelApplyWorker(worker));
708 : :
709 : : /*
710 : : * Only stop the worker if the generation matches and the worker is alive.
711 : : */
712 [ + - + - ]: 5 : if (worker->generation == generation && worker->proc)
84 713 : 5 : logicalrep_worker_stop_internal(worker, SIGUSR2);
714 : :
3091 tgl@sss.pgh.pa.us 715 : 5 : LWLockRelease(LogicalRepWorkerLock);
3254 peter_e@gmx.net 716 : 5 : }
717 : :
718 : : /*
719 : : * Wake up (using latch) any logical replication worker that matches the
720 : : * specified worker type, subscription id, and relation id.
721 : : */
722 : : void
50 akapila@postgresql.o 723 :GNC 212 : logicalrep_worker_wakeup(LogicalRepWorkerType wtype, Oid subid, Oid relid)
724 : : {
725 : : LogicalRepWorker *worker;
726 : :
727 : : /* relid must be valid only for table sync workers */
728 [ - + ]: 212 : Assert((wtype == WORKERTYPE_TABLESYNC) == OidIsValid(relid));
729 : :
3191 peter_e@gmx.net 730 :CBC 212 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
731 : :
50 akapila@postgresql.o 732 :GNC 212 : worker = logicalrep_worker_find(wtype, subid, relid, true);
733 : :
3191 peter_e@gmx.net 734 [ + - ]:CBC 212 : if (worker)
735 : 212 : logicalrep_worker_wakeup_ptr(worker);
736 : :
3092 tgl@sss.pgh.pa.us 737 : 212 : LWLockRelease(LogicalRepWorkerLock);
3191 peter_e@gmx.net 738 : 212 : }
739 : :
740 : : /*
741 : : * Wake up (using latch) the specified logical replication worker.
742 : : *
743 : : * Caller must hold lock, else worker->proc could change under us.
744 : : */
745 : : void
746 : 653 : logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker)
747 : : {
3092 tgl@sss.pgh.pa.us 748 [ - + ]: 653 : Assert(LWLockHeldByMe(LogicalRepWorkerLock));
749 : :
3191 peter_e@gmx.net 750 : 653 : SetLatch(&worker->proc->procLatch);
751 : 653 : }
752 : :
753 : : /*
754 : : * Attach to a slot.
755 : : */
756 : : void
3254 757 : 544 : logicalrep_worker_attach(int slot)
758 : : {
759 : : /* Block concurrent access. */
760 : 544 : LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
761 : :
762 [ + - - + ]: 544 : Assert(slot >= 0 && slot < max_logical_replication_workers);
763 : 544 : MyLogicalRepWorker = &LogicalRepCtx->workers[slot];
764 : :
3157 765 [ - + ]: 544 : if (!MyLogicalRepWorker->in_use)
766 : : {
3157 peter_e@gmx.net 767 :UBC 0 : LWLockRelease(LogicalRepWorkerLock);
768 [ # # ]: 0 : ereport(ERROR,
769 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
770 : : errmsg("logical replication worker slot %d is empty, cannot attach",
771 : : slot)));
772 : : }
773 : :
3254 peter_e@gmx.net 774 [ - + ]:CBC 544 : if (MyLogicalRepWorker->proc)
775 : : {
3157 peter_e@gmx.net 776 :UBC 0 : LWLockRelease(LogicalRepWorkerLock);
3254 777 [ # # ]: 0 : ereport(ERROR,
778 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
779 : : errmsg("logical replication worker slot %d is already used by "
780 : : "another worker, cannot attach", slot)));
781 : : }
782 : :
3254 peter_e@gmx.net 783 :CBC 544 : MyLogicalRepWorker->proc = MyProc;
784 : 544 : before_shmem_exit(logicalrep_worker_onexit, (Datum) 0);
785 : :
786 : 544 : LWLockRelease(LogicalRepWorkerLock);
787 : 544 : }
788 : :
789 : : /*
790 : : * Stop the parallel apply workers if any, and detach the leader apply worker
791 : : * (cleans up the worker info).
792 : : */
793 : : static void
794 : 544 : logicalrep_worker_detach(void)
795 : : {
796 : : /* Stop the parallel apply workers. */
1073 akapila@postgresql.o 797 [ + + ]: 544 : if (am_leader_apply_worker())
798 : : {
799 : : List *workers;
800 : : ListCell *lc;
801 : :
802 : : /*
803 : : * Detach from the error_mq_handle for all parallel apply workers
804 : : * before terminating them. This prevents the leader apply worker from
805 : : * receiving the worker termination message and sending it to logs
806 : : * when the same is already done by the parallel worker.
807 : : */
808 : 325 : pa_detach_all_error_mq();
809 : :
810 : 325 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
811 : :
511 812 : 325 : workers = logicalrep_workers_find(MyLogicalRepWorker->subid, true, false);
1073 813 [ + - + + : 656 : foreach(lc, workers)
+ + ]
814 : : {
815 : 331 : LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
816 : :
817 [ + - + + ]: 331 : if (isParallelApplyWorker(w))
818 : 3 : logicalrep_worker_stop_internal(w, SIGTERM);
819 : : }
820 : :
821 : 325 : LWLockRelease(LogicalRepWorkerLock);
822 : :
137 tgl@sss.pgh.pa.us 823 :GNC 325 : list_free(workers);
824 : : }
825 : :
826 : : /* Block concurrent access. */
3254 peter_e@gmx.net 827 :CBC 544 : LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
828 : :
3157 829 : 544 : logicalrep_worker_cleanup(MyLogicalRepWorker);
830 : :
3254 831 : 544 : LWLockRelease(LogicalRepWorkerLock);
832 : 544 : }
833 : :
834 : : /*
835 : : * Clean up worker info.
836 : : */
837 : : static void
3157 838 : 544 : logicalrep_worker_cleanup(LogicalRepWorker *worker)
839 : : {
840 [ - + ]: 544 : Assert(LWLockHeldByMeInMode(LogicalRepWorkerLock, LW_EXCLUSIVE));
841 : :
845 akapila@postgresql.o 842 : 544 : worker->type = WORKERTYPE_UNKNOWN;
3157 peter_e@gmx.net 843 : 544 : worker->in_use = false;
844 : 544 : worker->proc = NULL;
845 : 544 : worker->dbid = InvalidOid;
846 : 544 : worker->userid = InvalidOid;
847 : 544 : worker->subid = InvalidOid;
848 : 544 : worker->relid = InvalidOid;
1064 akapila@postgresql.o 849 : 544 : worker->leader_pid = InvalidPid;
1073 850 : 544 : worker->parallel_apply = false;
3157 peter_e@gmx.net 851 : 544 : }
852 : :
853 : : /*
854 : : * Cleanup function for logical replication launcher.
855 : : *
856 : : * Called on logical replication launcher exit.
857 : : */
858 : : static void
3164 fujii@postgresql.org 859 : 413 : logicalrep_launcher_onexit(int code, Datum arg)
860 : : {
861 : 413 : LogicalRepCtx->launcher_pid = 0;
862 : 413 : }
863 : :
864 : : /*
865 : : * Reset the last_seqsync_start_time of the sequencesync worker in the
866 : : * subscription's apply worker.
867 : : *
868 : : * Note that this value is not stored in the sequencesync worker, because that
869 : : * has finished already and is about to exit.
870 : : */
871 : : void
42 akapila@postgresql.o 872 :GNC 5 : logicalrep_reset_seqsync_start_time(void)
873 : : {
874 : : LogicalRepWorker *worker;
875 : :
876 : : /*
877 : : * The apply worker can't access last_seqsync_start_time concurrently, so
878 : : * it is okay to use SHARED lock here. See ProcessSequencesForSync().
879 : : */
880 : 5 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
881 : :
882 : 5 : worker = logicalrep_worker_find(WORKERTYPE_APPLY,
883 : 5 : MyLogicalRepWorker->subid, InvalidOid,
884 : : true);
885 [ + - ]: 5 : if (worker)
886 : 5 : worker->last_seqsync_start_time = 0;
887 : :
888 : 5 : LWLockRelease(LogicalRepWorkerLock);
889 : 5 : }
890 : :
891 : : /*
892 : : * Cleanup function.
893 : : *
894 : : * Called on logical replication worker exit.
895 : : */
896 : : static void
3254 peter_e@gmx.net 897 :CBC 544 : logicalrep_worker_onexit(int code, Datum arg)
898 : : {
899 : : /* Disconnect gracefully from the remote side. */
1680 alvherre@alvh.no-ip. 900 [ + + ]: 544 : if (LogRepWorkerWalRcvConn)
901 : 431 : walrcv_disconnect(LogRepWorkerWalRcvConn);
902 : :
3254 peter_e@gmx.net 903 : 544 : logicalrep_worker_detach();
904 : :
905 : : /* Cleanup fileset used for streaming transactions. */
1567 akapila@postgresql.o 906 [ + + ]: 544 : if (MyLogicalRepWorker->stream_fileset != NULL)
907 : 14 : FileSetDeleteAll(MyLogicalRepWorker->stream_fileset);
908 : :
909 : : /*
910 : : * Session level locks may be acquired outside of a transaction in
911 : : * parallel apply mode and will not be released when the worker
912 : : * terminates, so manually release all locks before the worker exits.
913 : : *
914 : : * The locks will be acquired once the worker is initialized.
915 : : */
959 916 [ + + ]: 544 : if (!InitializingApplyWorker)
917 : 479 : LockReleaseAll(DEFAULT_LOCKMETHOD, true);
918 : :
3121 peter_e@gmx.net 919 : 544 : ApplyLauncherWakeup();
3254 920 : 544 : }
921 : :
922 : : /*
923 : : * Count the number of registered (not necessarily running) sync workers
924 : : * for a subscription.
925 : : */
926 : : int
3191 927 : 1327 : logicalrep_sync_worker_count(Oid subid)
928 : : {
929 : : int i;
3136 bruce@momjian.us 930 : 1327 : int res = 0;
931 : :
3191 peter_e@gmx.net 932 [ - + ]: 1327 : Assert(LWLockHeldByMe(LogicalRepWorkerLock));
933 : :
934 : : /* Search for attached worker for a given subscription id. */
935 [ + + ]: 6841 : for (i = 0; i < max_logical_replication_workers; i++)
936 : : {
3136 bruce@momjian.us 937 : 5514 : LogicalRepWorker *w = &LogicalRepCtx->workers[i];
938 : :
42 akapila@postgresql.o 939 [ + + + - :GNC 5514 : if (w->subid == subid && (isTableSyncWorker(w) || isSequenceSyncWorker(w)))
+ + + - -
+ ]
3191 peter_e@gmx.net 940 :CBC 1510 : res++;
941 : : }
942 : :
943 : 1327 : return res;
944 : : }
945 : :
946 : : /*
947 : : * Count the number of registered (but not necessarily running) parallel apply
948 : : * workers for a subscription.
949 : : */
950 : : static int
1073 akapila@postgresql.o 951 : 418 : logicalrep_pa_worker_count(Oid subid)
952 : : {
953 : : int i;
954 : 418 : int res = 0;
955 : :
956 [ - + ]: 418 : Assert(LWLockHeldByMe(LogicalRepWorkerLock));
957 : :
958 : : /*
959 : : * Scan all attached parallel apply workers, only counting those which
960 : : * have the given subscription id.
961 : : */
962 [ + + ]: 2214 : for (i = 0; i < max_logical_replication_workers; i++)
963 : : {
964 : 1796 : LogicalRepWorker *w = &LogicalRepCtx->workers[i];
965 : :
845 966 [ + + + + : 1796 : if (isParallelApplyWorker(w) && w->subid == subid)
+ - ]
1073 967 : 2 : res++;
968 : : }
969 : :
970 : 418 : return res;
971 : : }
972 : :
973 : : /*
974 : : * ApplyLauncherShmemSize
975 : : * Compute space needed for replication launcher shared memory
976 : : */
977 : : Size
3254 peter_e@gmx.net 978 : 4124 : ApplyLauncherShmemSize(void)
979 : : {
980 : : Size size;
981 : :
982 : : /*
983 : : * Need the fixed struct and the array of LogicalRepWorker.
984 : : */
985 : 4124 : size = sizeof(LogicalRepCtxStruct);
986 : 4124 : size = MAXALIGN(size);
987 : 4124 : size = add_size(size, mul_size(max_logical_replication_workers,
988 : : sizeof(LogicalRepWorker)));
989 : 4124 : return size;
990 : : }
991 : :
992 : : /*
993 : : * ApplyLauncherRegister
994 : : * Register a background worker running the logical replication launcher.
995 : : */
996 : : void
997 : 849 : ApplyLauncherRegister(void)
998 : : {
999 : : BackgroundWorker bgw;
1000 : :
1001 : : /*
1002 : : * The logical replication launcher is disabled during binary upgrades, to
1003 : : * prevent logical replication workers from running on the source cluster.
1004 : : * That could cause replication origins to move forward after having been
1005 : : * copied to the target cluster, potentially creating conflicts with the
1006 : : * copied data files.
1007 : : */
705 michael@paquier.xyz 1008 [ + + + + ]: 849 : if (max_logical_replication_workers == 0 || IsBinaryUpgrade)
3254 peter_e@gmx.net 1009 : 57 : return;
1010 : :
3167 tgl@sss.pgh.pa.us 1011 : 792 : memset(&bgw, 0, sizeof(bgw));
3136 bruce@momjian.us 1012 : 792 : bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
1013 : : BGWORKER_BACKEND_DATABASE_CONNECTION;
3254 peter_e@gmx.net 1014 : 792 : bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
898 nathan@postgresql.or 1015 : 792 : snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres");
3183 rhaas@postgresql.org 1016 : 792 : snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyLauncherMain");
3254 peter_e@gmx.net 1017 : 792 : snprintf(bgw.bgw_name, BGW_MAXLEN,
1018 : : "logical replication launcher");
3030 1019 : 792 : snprintf(bgw.bgw_type, BGW_MAXLEN,
1020 : : "logical replication launcher");
3254 1021 : 792 : bgw.bgw_restart_time = 5;
1022 : 792 : bgw.bgw_notify_pid = 0;
1023 : 792 : bgw.bgw_main_arg = (Datum) 0;
1024 : :
1025 : 792 : RegisterBackgroundWorker(&bgw);
1026 : : }
1027 : :
1028 : : /*
1029 : : * ApplyLauncherShmemInit
1030 : : * Allocate and initialize replication launcher shared memory
1031 : : */
1032 : : void
1033 : 1069 : ApplyLauncherShmemInit(void)
1034 : : {
1035 : : bool found;
1036 : :
1037 : 1069 : LogicalRepCtx = (LogicalRepCtxStruct *)
1038 : 1069 : ShmemInitStruct("Logical Replication Launcher Data",
1039 : : ApplyLauncherShmemSize(),
1040 : : &found);
1041 : :
1042 [ + - ]: 1069 : if (!found)
1043 : : {
1044 : : int slot;
1045 : :
1046 : 1069 : memset(LogicalRepCtx, 0, ApplyLauncherShmemSize());
1047 : :
1057 tgl@sss.pgh.pa.us 1048 : 1069 : LogicalRepCtx->last_start_dsa = DSA_HANDLE_INVALID;
1049 : 1069 : LogicalRepCtx->last_start_dsh = DSHASH_HANDLE_INVALID;
1050 : :
1051 : : /* Initialize memory and spin locks for each worker slot. */
3191 peter_e@gmx.net 1052 [ + + ]: 5308 : for (slot = 0; slot < max_logical_replication_workers; slot++)
1053 : : {
1054 : 4239 : LogicalRepWorker *worker = &LogicalRepCtx->workers[slot];
1055 : :
1056 : 4239 : memset(worker, 0, sizeof(LogicalRepWorker));
1057 : 4239 : SpinLockInit(&worker->relmutex);
1058 : : }
1059 : : }
3254 1060 : 1069 : }
1061 : :
1062 : : /*
1063 : : * Initialize or attach to the dynamic shared hash table that stores the
1064 : : * last-start times, if not already done.
1065 : : * This must be called before accessing the table.
1066 : : */
1067 : : static void
1060 tgl@sss.pgh.pa.us 1068 : 720 : logicalrep_launcher_attach_dshmem(void)
1069 : : {
1070 : : MemoryContext oldcontext;
1071 : :
1072 : : /* Quick exit if we already did this. */
1057 1073 [ + + ]: 720 : if (LogicalRepCtx->last_start_dsh != DSHASH_HANDLE_INVALID &&
1060 1074 [ + + ]: 667 : last_start_times != NULL)
1075 : 475 : return;
1076 : :
1077 : : /* Otherwise, use a lock to ensure only one process creates the table. */
1078 : 245 : LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
1079 : :
1080 : : /* Be sure any local memory allocated by DSA routines is persistent. */
1081 : 245 : oldcontext = MemoryContextSwitchTo(TopMemoryContext);
1082 : :
1057 1083 [ + + ]: 245 : if (LogicalRepCtx->last_start_dsh == DSHASH_HANDLE_INVALID)
1084 : : {
1085 : : /* Initialize dynamic shared hash table for last-start times. */
1060 1086 : 53 : last_start_times_dsa = dsa_create(LWTRANCHE_LAUNCHER_DSA);
1087 : 53 : dsa_pin(last_start_times_dsa);
1088 : 53 : dsa_pin_mapping(last_start_times_dsa);
660 nathan@postgresql.or 1089 : 53 : last_start_times = dshash_create(last_start_times_dsa, &dsh_params, NULL);
1090 : :
1091 : : /* Store handles in shared memory for other backends to use. */
1060 tgl@sss.pgh.pa.us 1092 : 53 : LogicalRepCtx->last_start_dsa = dsa_get_handle(last_start_times_dsa);
1093 : 53 : LogicalRepCtx->last_start_dsh = dshash_get_hash_table_handle(last_start_times);
1094 : : }
1095 [ + - ]: 192 : else if (!last_start_times)
1096 : : {
1097 : : /* Attach to existing dynamic shared hash table. */
1098 : 192 : last_start_times_dsa = dsa_attach(LogicalRepCtx->last_start_dsa);
1099 : 192 : dsa_pin_mapping(last_start_times_dsa);
1100 : 192 : last_start_times = dshash_attach(last_start_times_dsa, &dsh_params,
194 nathan@postgresql.or 1101 : 192 : LogicalRepCtx->last_start_dsh, NULL);
1102 : : }
1103 : :
1060 tgl@sss.pgh.pa.us 1104 : 245 : MemoryContextSwitchTo(oldcontext);
1105 : 245 : LWLockRelease(LogicalRepWorkerLock);
1106 : : }
1107 : :
1108 : : /*
1109 : : * Set the last-start time for the subscription.
1110 : : */
1111 : : static void
1112 : 201 : ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time)
1113 : : {
1114 : : LauncherLastStartTimesEntry *entry;
1115 : : bool found;
1116 : :
1117 : 201 : logicalrep_launcher_attach_dshmem();
1118 : :
1119 : 201 : entry = dshash_find_or_insert(last_start_times, &subid, &found);
1120 : 201 : entry->last_start_time = start_time;
1121 : 201 : dshash_release_lock(last_start_times, entry);
1122 : 201 : }
1123 : :
1124 : : /*
1125 : : * Return the last-start time for the subscription, or 0 if there isn't one.
1126 : : */
1127 : : static TimestampTz
1128 : 288 : ApplyLauncherGetWorkerStartTime(Oid subid)
1129 : : {
1130 : : LauncherLastStartTimesEntry *entry;
1131 : : TimestampTz ret;
1132 : :
1133 : 288 : logicalrep_launcher_attach_dshmem();
1134 : :
1135 : 288 : entry = dshash_find(last_start_times, &subid, false);
1136 [ + + ]: 288 : if (entry == NULL)
1137 : 124 : return 0;
1138 : :
1139 : 164 : ret = entry->last_start_time;
1140 : 164 : dshash_release_lock(last_start_times, entry);
1141 : :
1142 : 164 : return ret;
1143 : : }
1144 : :
1145 : : /*
1146 : : * Remove the last-start-time entry for the subscription, if one exists.
1147 : : *
1148 : : * This has two use-cases: to remove the entry related to a subscription
1149 : : * that's been deleted or disabled (just to avoid leaking shared memory),
1150 : : * and to allow immediate restart of an apply worker that has exited
1151 : : * due to subscription parameter changes.
1152 : : */
1153 : : void
1154 : 231 : ApplyLauncherForgetWorkerStartTime(Oid subid)
1155 : : {
1156 : 231 : logicalrep_launcher_attach_dshmem();
1157 : :
1158 : 231 : (void) dshash_delete_key(last_start_times, &subid);
1159 : 231 : }
1160 : :
1161 : : /*
1162 : : * Wakeup the launcher on commit if requested.
1163 : : */
1164 : : void
3152 peter_e@gmx.net 1165 : 331830 : AtEOXact_ApplyLauncher(bool isCommit)
1166 : : {
3057 1167 [ + + ]: 331830 : if (isCommit)
1168 : : {
1169 [ + + ]: 305452 : if (on_commit_launcher_wakeup)
1170 : 143 : ApplyLauncherWakeup();
1171 : : }
1172 : :
3152 1173 : 331830 : on_commit_launcher_wakeup = false;
3254 1174 : 331830 : }
1175 : :
1176 : : /*
1177 : : * Request wakeup of the launcher on commit of the transaction.
1178 : : *
1179 : : * This is used to send launcher signal to stop sleeping and process the
1180 : : * subscriptions when current transaction commits. Should be used when new
1181 : : * tuple was added to the pg_subscription catalog.
1182 : : */
1183 : : void
1184 : 144 : ApplyLauncherWakeupAtCommit(void)
1185 : : {
3236 heikki.linnakangas@i 1186 [ + + ]: 144 : if (!on_commit_launcher_wakeup)
1187 : 143 : on_commit_launcher_wakeup = true;
3254 peter_e@gmx.net 1188 : 144 : }
1189 : :
1190 : : /*
1191 : : * Wakeup the launcher immediately.
1192 : : */
1193 : : void
1194 : 721 : ApplyLauncherWakeup(void)
1195 : : {
3164 fujii@postgresql.org 1196 [ + + ]: 721 : if (LogicalRepCtx->launcher_pid != 0)
3254 peter_e@gmx.net 1197 : 697 : kill(LogicalRepCtx->launcher_pid, SIGUSR1);
1198 : 721 : }
1199 : :
1200 : : /*
1201 : : * Main loop for the apply launcher process.
1202 : : */
1203 : : void
1204 : 413 : ApplyLauncherMain(Datum main_arg)
1205 : : {
3204 tgl@sss.pgh.pa.us 1206 [ + + ]: 413 : ereport(DEBUG1,
1207 : : (errmsg_internal("logical replication launcher started")));
1208 : :
3164 fujii@postgresql.org 1209 : 413 : before_shmem_exit(logicalrep_launcher_onexit, (Datum) 0);
1210 : :
3114 andres@anarazel.de 1211 [ - + ]: 413 : Assert(LogicalRepCtx->launcher_pid == 0);
1212 : 413 : LogicalRepCtx->launcher_pid = MyProcPid;
1213 : :
1214 : : /* Establish signal handlers. */
1979 akapila@postgresql.o 1215 : 413 : pqsignal(SIGHUP, SignalHandlerForConfigReload);
3114 andres@anarazel.de 1216 : 413 : pqsignal(SIGTERM, die);
3254 peter_e@gmx.net 1217 : 413 : BackgroundWorkerUnblockSignals();
1218 : :
1219 : : /*
1220 : : * Establish connection to nailed catalogs (we only ever access
1221 : : * pg_subscription).
1222 : : */
2813 magnus@hagander.net 1223 : 413 : BackgroundWorkerInitializeConnection(NULL, NULL, 0);
1224 : :
1225 : : /*
1226 : : * Acquire the conflict detection slot at startup to ensure it can be
1227 : : * dropped if no longer needed after a restart.
1228 : : */
147 akapila@postgresql.o 1229 :GNC 413 : acquire_conflict_slot_if_exists();
1230 : :
1231 : : /* Enter main loop */
1232 : : for (;;)
3254 peter_e@gmx.net 1233 :CBC 2376 : {
1234 : : int rc;
1235 : : List *sublist;
1236 : : ListCell *lc;
1237 : : MemoryContext subctx;
1238 : : MemoryContext oldctx;
3136 bruce@momjian.us 1239 : 2789 : long wait_time = DEFAULT_NAPTIME_PER_CYCLE;
106 akapila@postgresql.o 1240 :GNC 2789 : bool can_update_xmin = true;
147 1241 : 2789 : bool retain_dead_tuples = false;
1242 : 2789 : TransactionId xmin = InvalidTransactionId;
1243 : :
3114 andres@anarazel.de 1244 [ - + ]:CBC 2789 : CHECK_FOR_INTERRUPTS();
1245 : :
1246 : : /* Use temporary context to avoid leaking memory across cycles. */
1060 tgl@sss.pgh.pa.us 1247 : 2789 : subctx = AllocSetContextCreate(TopMemoryContext,
1248 : : "Logical Replication Launcher sublist",
1249 : : ALLOCSET_DEFAULT_SIZES);
1250 : 2789 : oldctx = MemoryContextSwitchTo(subctx);
1251 : :
1252 : : /*
1253 : : * Start any missing workers for enabled subscriptions.
1254 : : *
1255 : : * Also, during the iteration through all subscriptions, we compute
1256 : : * the minimum XID required to protect deleted tuples for conflict
1257 : : * detection if one of the subscription enables retain_dead_tuples
1258 : : * option.
1259 : : */
1260 : 2789 : sublist = get_subscription_list();
1261 [ + + + + : 3614 : foreach(lc, sublist)
+ + ]
1262 : : {
1263 : 827 : Subscription *sub = (Subscription *) lfirst(lc);
1264 : : LogicalRepWorker *w;
1265 : : TimestampTz last_start;
1266 : : TimestampTz now;
1267 : : long elapsed;
1268 : :
147 akapila@postgresql.o 1269 [ + + ]:GNC 827 : if (sub->retaindeadtuples)
1270 : : {
1271 : 7 : retain_dead_tuples = true;
1272 : :
1273 : : /*
1274 : : * Create a replication slot to retain information necessary
1275 : : * for conflict detection such as dead tuples, commit
1276 : : * timestamps, and origins.
1277 : : *
1278 : : * The slot is created before starting the apply worker to
1279 : : * prevent it from unnecessarily maintaining its
1280 : : * oldest_nonremovable_xid.
1281 : : *
1282 : : * The slot is created even for a disabled subscription to
1283 : : * ensure that conflict-related information is available when
1284 : : * applying remote changes that occurred before the
1285 : : * subscription was enabled.
1286 : : */
1287 : 7 : CreateConflictDetectionSlot();
1288 : :
106 1289 [ + - ]: 7 : if (sub->retentionactive)
1290 : : {
1291 : : /*
1292 : : * Can't advance xmin of the slot unless all the
1293 : : * subscriptions actively retaining dead tuples are
1294 : : * enabled. This is required to ensure that we don't
1295 : : * advance the xmin of CONFLICT_DETECTION_SLOT if one of
1296 : : * the subscriptions is not enabled. Otherwise, we won't
1297 : : * be able to detect conflicts reliably for such a
1298 : : * subscription even though it has set the
1299 : : * retain_dead_tuples option.
1300 : : */
1301 : 7 : can_update_xmin &= sub->enabled;
1302 : :
1303 : : /*
1304 : : * Initialize the slot once the subscription activates
1305 : : * retention.
1306 : : */
1307 [ - + ]: 7 : if (!TransactionIdIsValid(MyReplicationSlot->data.xmin))
106 akapila@postgresql.o 1308 :UNC 0 : init_conflict_slot_xmin();
1309 : : }
1310 : : }
1311 : :
1060 tgl@sss.pgh.pa.us 1312 [ + + ]:CBC 827 : if (!sub->enabled)
1313 : 44 : continue;
1314 : :
1315 : 783 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
50 akapila@postgresql.o 1316 :GNC 783 : w = logicalrep_worker_find(WORKERTYPE_APPLY, sub->oid, InvalidOid,
1317 : : false);
1318 : :
1060 tgl@sss.pgh.pa.us 1319 [ + + ]:CBC 783 : if (w != NULL)
1320 : : {
1321 : : /*
1322 : : * Compute the minimum xmin required to protect dead tuples
1323 : : * required for conflict detection among all running apply
1324 : : * workers. This computation is performed while holding
1325 : : * LogicalRepWorkerLock to prevent accessing invalid worker
1326 : : * data, in scenarios where a worker might exit and reset its
1327 : : * state concurrently.
1328 : : */
106 akapila@postgresql.o 1329 [ + + ]:GNC 495 : if (sub->retaindeadtuples &&
1330 [ + - + - ]: 3 : sub->retentionactive &&
1331 : : can_update_xmin)
147 1332 : 3 : compute_min_nonremovable_xid(w, &xmin);
1333 : :
93 1334 : 495 : LWLockRelease(LogicalRepWorkerLock);
1335 : :
1336 : : /* worker is running already */
147 1337 : 495 : continue;
1338 : : }
1339 : :
93 1340 : 288 : LWLockRelease(LogicalRepWorkerLock);
1341 : :
1342 : : /*
1343 : : * Can't advance xmin of the slot unless all the workers
1344 : : * corresponding to subscriptions actively retaining dead tuples
1345 : : * are running, disabling the further computation of the minimum
1346 : : * nonremovable xid.
1347 : : */
106 1348 [ + + + - ]: 288 : if (sub->retaindeadtuples && sub->retentionactive)
1349 : 1 : can_update_xmin = false;
1350 : :
1351 : : /*
1352 : : * If the worker is eligible to start now, launch it. Otherwise,
1353 : : * adjust wait_time so that we'll wake up as soon as it can be
1354 : : * started.
1355 : : *
1356 : : * Each subscription's apply worker can only be restarted once per
1357 : : * wal_retrieve_retry_interval, so that errors do not cause us to
1358 : : * repeatedly restart the worker as fast as possible. In cases
1359 : : * where a restart is expected (e.g., subscription parameter
1360 : : * changes), another process should remove the last-start entry
1361 : : * for the subscription so that the worker can be restarted
1362 : : * without waiting for wal_retrieve_retry_interval to elapse.
1363 : : */
1060 tgl@sss.pgh.pa.us 1364 :CBC 288 : last_start = ApplyLauncherGetWorkerStartTime(sub->oid);
1365 : 288 : now = GetCurrentTimestamp();
1366 [ + + ]: 288 : if (last_start == 0 ||
1367 [ + + ]: 164 : (elapsed = TimestampDifferenceMilliseconds(last_start, now)) >= wal_retrieve_retry_interval)
1368 : : {
1369 : 201 : ApplyLauncherSetWorkerStartTime(sub->oid, now);
176 1370 [ + + ]: 201 : if (!logicalrep_worker_launch(WORKERTYPE_APPLY,
1371 : 201 : sub->dbid, sub->oid, sub->name,
1372 : : sub->owner, InvalidOid,
1373 : : DSM_HANDLE_INVALID,
106 akapila@postgresql.o 1374 [ + + ]:GNC 202 : sub->retaindeadtuples &&
1375 [ + - ]: 1 : sub->retentionactive))
1376 : : {
1377 : : /*
1378 : : * We get here either if we failed to launch a worker
1379 : : * (perhaps for resource-exhaustion reasons) or if we
1380 : : * launched one but it immediately quit. Either way, it
1381 : : * seems appropriate to try again after
1382 : : * wal_retrieve_retry_interval.
1383 : : */
176 tgl@sss.pgh.pa.us 1384 :CBC 2 : wait_time = Min(wait_time,
1385 : : wal_retrieve_retry_interval);
1386 : : }
1387 : : }
1388 : : else
1389 : : {
1060 1390 : 87 : wait_time = Min(wait_time,
1391 : : wal_retrieve_retry_interval - elapsed);
1392 : : }
1393 : : }
1394 : :
1395 : : /*
1396 : : * Drop the CONFLICT_DETECTION_SLOT slot if there is no subscription
1397 : : * that requires us to retain dead tuples. Otherwise, if required,
1398 : : * advance the slot's xmin to protect dead tuples required for the
1399 : : * conflict detection.
1400 : : *
1401 : : * Additionally, if all apply workers for subscriptions with
1402 : : * retain_dead_tuples enabled have requested to stop retention, the
1403 : : * slot's xmin will be set to InvalidTransactionId allowing the
1404 : : * removal of dead tuples.
1405 : : */
147 akapila@postgresql.o 1406 [ + + ]:GNC 2787 : if (MyReplicationSlot)
1407 : : {
1408 [ + + ]: 8 : if (!retain_dead_tuples)
1409 : 1 : ReplicationSlotDropAcquired();
106 1410 [ + + ]: 7 : else if (can_update_xmin)
1411 : 3 : update_conflict_slot_xmin(xmin);
1412 : : }
1413 : :
1414 : : /* Switch back to original memory context. */
1060 tgl@sss.pgh.pa.us 1415 :CBC 2787 : MemoryContextSwitchTo(oldctx);
1416 : : /* Clean the temporary memory. */
1417 : 2787 : MemoryContextDelete(subctx);
1418 : :
1419 : : /* Wait for more work. */
3116 andres@anarazel.de 1420 : 2787 : rc = WaitLatch(MyLatch,
1421 : : WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
1422 : : wait_time,
1423 : : WAIT_EVENT_LOGICAL_LAUNCHER_MAIN);
1424 : :
1425 [ + + ]: 2784 : if (rc & WL_LATCH_SET)
1426 : : {
1427 : 2769 : ResetLatch(MyLatch);
1428 [ + + ]: 2769 : CHECK_FOR_INTERRUPTS();
1429 : : }
1430 : :
2192 rhaas@postgresql.org 1431 [ + + ]: 2376 : if (ConfigReloadPending)
1432 : : {
1433 : 42 : ConfigReloadPending = false;
3173 peter_e@gmx.net 1434 : 42 : ProcessConfigFile(PGC_SIGHUP);
1435 : : }
1436 : : }
1437 : :
1438 : : /* Not reachable */
1439 : : }
1440 : :
1441 : : /*
1442 : : * Determine the minimum non-removable transaction ID across all apply workers
1443 : : * for subscriptions that have retain_dead_tuples enabled. Store the result
1444 : : * in *xmin.
1445 : : */
1446 : : static void
147 akapila@postgresql.o 1447 :GNC 3 : compute_min_nonremovable_xid(LogicalRepWorker *worker, TransactionId *xmin)
1448 : : {
1449 : : TransactionId nonremovable_xid;
1450 : :
1451 [ - + ]: 3 : Assert(worker != NULL);
1452 : :
1453 : : /*
1454 : : * The replication slot for conflict detection must be created before the
1455 : : * worker starts.
1456 : : */
1457 [ - + ]: 3 : Assert(MyReplicationSlot);
1458 : :
1459 [ - + ]: 3 : SpinLockAcquire(&worker->relmutex);
1460 : 3 : nonremovable_xid = worker->oldest_nonremovable_xid;
1461 : 3 : SpinLockRelease(&worker->relmutex);
1462 : :
1463 : : /*
1464 : : * Return if the apply worker has stopped retention concurrently.
1465 : : *
1466 : : * Although this function is invoked only when retentionactive is true,
1467 : : * the apply worker might stop retention after the launcher fetches the
1468 : : * retentionactive flag.
1469 : : */
106 1470 [ - + ]: 3 : if (!TransactionIdIsValid(nonremovable_xid))
106 akapila@postgresql.o 1471 :UNC 0 : return;
1472 : :
147 akapila@postgresql.o 1473 [ - + - - ]:GNC 3 : if (!TransactionIdIsValid(*xmin) ||
147 akapila@postgresql.o 1474 :UNC 0 : TransactionIdPrecedes(nonremovable_xid, *xmin))
147 akapila@postgresql.o 1475 :GNC 3 : *xmin = nonremovable_xid;
1476 : : }
1477 : :
1478 : : /*
1479 : : * Acquire the replication slot used to retain information for conflict
1480 : : * detection, if it exists.
1481 : : *
1482 : : * Return true if successfully acquired, otherwise return false.
1483 : : */
1484 : : static bool
1485 : 413 : acquire_conflict_slot_if_exists(void)
1486 : : {
1487 [ + + ]: 413 : if (!SearchNamedReplicationSlot(CONFLICT_DETECTION_SLOT, true))
1488 : 412 : return false;
1489 : :
1490 : 1 : ReplicationSlotAcquire(CONFLICT_DETECTION_SLOT, true, false);
1491 : 1 : return true;
1492 : : }
1493 : :
1494 : : /*
1495 : : * Update the xmin the replication slot used to retain information required
1496 : : * for conflict detection.
1497 : : */
1498 : : static void
106 1499 : 3 : update_conflict_slot_xmin(TransactionId new_xmin)
1500 : : {
147 1501 [ - + ]: 3 : Assert(MyReplicationSlot);
106 1502 [ + - - + ]: 3 : Assert(!TransactionIdIsValid(new_xmin) ||
1503 : : TransactionIdPrecedesOrEquals(MyReplicationSlot->data.xmin, new_xmin));
1504 : :
1505 : : /* Return if the xmin value of the slot cannot be updated */
147 1506 [ + + ]: 3 : if (TransactionIdEquals(MyReplicationSlot->data.xmin, new_xmin))
1507 : 2 : return;
1508 : :
1509 [ - + ]: 1 : SpinLockAcquire(&MyReplicationSlot->mutex);
1510 : 1 : MyReplicationSlot->effective_xmin = new_xmin;
1511 : 1 : MyReplicationSlot->data.xmin = new_xmin;
1512 : 1 : SpinLockRelease(&MyReplicationSlot->mutex);
1513 : :
1514 [ - + ]: 1 : elog(DEBUG1, "updated xmin: %u", MyReplicationSlot->data.xmin);
1515 : :
1516 : 1 : ReplicationSlotMarkDirty();
1517 : 1 : ReplicationSlotsComputeRequiredXmin(false);
1518 : :
1519 : : /*
1520 : : * Like PhysicalConfirmReceivedLocation(), do not save slot information
1521 : : * each time. This is acceptable because all concurrent transactions on
1522 : : * the publisher that require the data preceding the slot's xmin should
1523 : : * have already been applied and flushed on the subscriber before the xmin
1524 : : * is advanced. So, even if the slot's xmin regresses after a restart, it
1525 : : * will be advanced again in the next cycle. Therefore, no data required
1526 : : * for conflict detection will be prematurely removed.
1527 : : */
1528 : 1 : return;
1529 : : }
1530 : :
1531 : : /*
1532 : : * Initialize the xmin for the conflict detection slot.
1533 : : */
1534 : : static void
106 1535 : 3 : init_conflict_slot_xmin(void)
1536 : : {
1537 : : TransactionId xmin_horizon;
1538 : :
1539 : : /* Replication slot must exist but shouldn't be initialized. */
1540 [ + - - + ]: 3 : Assert(MyReplicationSlot &&
1541 : : !TransactionIdIsValid(MyReplicationSlot->data.xmin));
1542 : :
147 1543 : 3 : LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
1544 : :
1545 : 3 : xmin_horizon = GetOldestSafeDecodingTransactionId(false);
1546 : :
1547 [ - + ]: 3 : SpinLockAcquire(&MyReplicationSlot->mutex);
1548 : 3 : MyReplicationSlot->effective_xmin = xmin_horizon;
1549 : 3 : MyReplicationSlot->data.xmin = xmin_horizon;
1550 : 3 : SpinLockRelease(&MyReplicationSlot->mutex);
1551 : :
1552 : 3 : ReplicationSlotsComputeRequiredXmin(true);
1553 : :
1554 : 3 : LWLockRelease(ProcArrayLock);
1555 : :
1556 : : /* Write this slot to disk */
1557 : 3 : ReplicationSlotMarkDirty();
1558 : 3 : ReplicationSlotSave();
1559 : 3 : }
1560 : :
1561 : : /*
1562 : : * Create and acquire the replication slot used to retain information for
1563 : : * conflict detection, if not yet.
1564 : : */
1565 : : void
106 1566 : 8 : CreateConflictDetectionSlot(void)
1567 : : {
1568 : : /* Exit early, if the replication slot is already created and acquired */
1569 [ + + ]: 8 : if (MyReplicationSlot)
1570 : 5 : return;
1571 : :
1572 [ + - ]: 3 : ereport(LOG,
1573 : : errmsg("creating replication conflict detection slot"));
1574 : :
1575 : 3 : ReplicationSlotCreate(CONFLICT_DETECTION_SLOT, false, RS_PERSISTENT, false,
1576 : : false, false);
1577 : :
1578 : 3 : init_conflict_slot_xmin();
1579 : : }
1580 : :
1581 : : /*
1582 : : * Is current process the logical replication launcher?
1583 : : */
1584 : : bool
3114 andres@anarazel.de 1585 :CBC 2358 : IsLogicalLauncher(void)
1586 : : {
1587 : 2358 : return LogicalRepCtx->launcher_pid == MyProcPid;
1588 : : }
1589 : :
1590 : : /*
1591 : : * Return the pid of the leader apply worker if the given pid is the pid of a
1592 : : * parallel apply worker, otherwise, return InvalidPid.
1593 : : */
1594 : : pid_t
1064 akapila@postgresql.o 1595 : 866 : GetLeaderApplyWorkerPid(pid_t pid)
1596 : : {
1597 : 866 : int leader_pid = InvalidPid;
1598 : : int i;
1599 : :
1600 : 866 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
1601 : :
1602 [ + + ]: 4330 : for (i = 0; i < max_logical_replication_workers; i++)
1603 : : {
1604 : 3464 : LogicalRepWorker *w = &LogicalRepCtx->workers[i];
1605 : :
1606 [ + + - + : 3464 : if (isParallelApplyWorker(w) && w->proc && pid == w->proc->pid)
- - - - ]
1607 : : {
1064 akapila@postgresql.o 1608 :UBC 0 : leader_pid = w->leader_pid;
1609 : 0 : break;
1610 : : }
1611 : : }
1612 : :
1064 akapila@postgresql.o 1613 :CBC 866 : LWLockRelease(LogicalRepWorkerLock);
1614 : :
1615 : 866 : return leader_pid;
1616 : : }
1617 : :
1618 : : /*
1619 : : * Returns state of the subscriptions.
1620 : : */
1621 : : Datum
3254 peter_e@gmx.net 1622 : 1 : pg_stat_get_subscription(PG_FUNCTION_ARGS)
1623 : : {
1624 : : #define PG_STAT_GET_SUBSCRIPTION_COLS 10
1625 [ - + ]: 1 : Oid subid = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0);
1626 : : int i;
1627 : 1 : ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1628 : :
1156 michael@paquier.xyz 1629 : 1 : InitMaterializedSRF(fcinfo, 0);
1630 : :
1631 : : /* Make sure we get consistent view of the workers. */
3254 peter_e@gmx.net 1632 : 1 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
1633 : :
1289 tgl@sss.pgh.pa.us 1634 [ + + ]: 5 : for (i = 0; i < max_logical_replication_workers; i++)
1635 : : {
1636 : : /* for each row */
1250 peter@eisentraut.org 1637 : 4 : Datum values[PG_STAT_GET_SUBSCRIPTION_COLS] = {0};
1638 : 4 : bool nulls[PG_STAT_GET_SUBSCRIPTION_COLS] = {0};
1639 : : int worker_pid;
1640 : : LogicalRepWorker worker;
1641 : :
3254 peter_e@gmx.net 1642 : 4 : memcpy(&worker, &LogicalRepCtx->workers[i],
1643 : : sizeof(LogicalRepWorker));
1644 [ + + - + ]: 4 : if (!worker.proc || !IsBackendPid(worker.proc->pid))
1645 : 2 : continue;
1646 : :
1647 [ - + - - ]: 2 : if (OidIsValid(subid) && worker.subid != subid)
3254 peter_e@gmx.net 1648 :UBC 0 : continue;
1649 : :
3254 peter_e@gmx.net 1650 :CBC 2 : worker_pid = worker.proc->pid;
1651 : :
1652 : 2 : values[0] = ObjectIdGetDatum(worker.subid);
42 akapila@postgresql.o 1653 [ + - - + ]:GNC 2 : if (isTableSyncWorker(&worker))
3191 peter_e@gmx.net 1654 :UBC 0 : values[1] = ObjectIdGetDatum(worker.relid);
1655 : : else
3191 peter_e@gmx.net 1656 :CBC 2 : nulls[1] = true;
1657 : 2 : values[2] = Int32GetDatum(worker_pid);
1658 : :
1064 akapila@postgresql.o 1659 [ + - - + ]: 2 : if (isParallelApplyWorker(&worker))
1064 akapila@postgresql.o 1660 :UBC 0 : values[3] = Int32GetDatum(worker.leader_pid);
1661 : : else
3191 peter_e@gmx.net 1662 :CBC 2 : nulls[3] = true;
1663 : :
41 alvherre@kurilemu.de 1664 [ - + ]:GNC 2 : if (!XLogRecPtrIsValid(worker.last_lsn))
1064 akapila@postgresql.o 1665 :UBC 0 : nulls[4] = true;
1666 : : else
1064 akapila@postgresql.o 1667 :CBC 2 : values[4] = LSNGetDatum(worker.last_lsn);
3254 peter_e@gmx.net 1668 [ - + ]: 2 : if (worker.last_send_time == 0)
1064 akapila@postgresql.o 1669 :UBC 0 : nulls[5] = true;
1670 : : else
1064 akapila@postgresql.o 1671 :CBC 2 : values[5] = TimestampTzGetDatum(worker.last_send_time);
3254 peter_e@gmx.net 1672 [ - + ]: 2 : if (worker.last_recv_time == 0)
1064 akapila@postgresql.o 1673 :UBC 0 : nulls[6] = true;
1674 : : else
1064 akapila@postgresql.o 1675 :CBC 2 : values[6] = TimestampTzGetDatum(worker.last_recv_time);
41 alvherre@kurilemu.de 1676 [ - + ]:GNC 2 : if (!XLogRecPtrIsValid(worker.reply_lsn))
1064 akapila@postgresql.o 1677 :UBC 0 : nulls[7] = true;
1678 : : else
1064 akapila@postgresql.o 1679 :CBC 2 : values[7] = LSNGetDatum(worker.reply_lsn);
3254 peter_e@gmx.net 1680 [ - + ]: 2 : if (worker.reply_time == 0)
1064 akapila@postgresql.o 1681 :UBC 0 : nulls[8] = true;
1682 : : else
1064 akapila@postgresql.o 1683 :CBC 2 : values[8] = TimestampTzGetDatum(worker.reply_time);
1684 : :
814 nathan@postgresql.or 1685 [ + - - - : 2 : switch (worker.type)
- - ]
1686 : : {
1687 : 2 : case WORKERTYPE_APPLY:
1688 : 2 : values[9] = CStringGetTextDatum("apply");
1689 : 2 : break;
814 nathan@postgresql.or 1690 :UBC 0 : case WORKERTYPE_PARALLEL_APPLY:
1691 : 0 : values[9] = CStringGetTextDatum("parallel apply");
1692 : 0 : break;
42 akapila@postgresql.o 1693 :UNC 0 : case WORKERTYPE_SEQUENCESYNC:
1694 : 0 : values[9] = CStringGetTextDatum("sequence synchronization");
1695 : 0 : break;
814 nathan@postgresql.or 1696 :UBC 0 : case WORKERTYPE_TABLESYNC:
1697 : 0 : values[9] = CStringGetTextDatum("table synchronization");
1698 : 0 : break;
1699 : 0 : case WORKERTYPE_UNKNOWN:
1700 : : /* Should never happen. */
1701 [ # # ]: 0 : elog(ERROR, "unknown worker type");
1702 : : }
1703 : :
1381 michael@paquier.xyz 1704 :CBC 2 : tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
1705 : : values, nulls);
1706 : :
1707 : : /*
1708 : : * If only a single subscription was requested, and we found it,
1709 : : * break.
1710 : : */
3254 peter_e@gmx.net 1711 [ - + ]: 2 : if (OidIsValid(subid))
3254 peter_e@gmx.net 1712 :UBC 0 : break;
1713 : : }
1714 : :
3254 peter_e@gmx.net 1715 :CBC 1 : LWLockRelease(LogicalRepWorkerLock);
1716 : :
1717 : 1 : return (Datum) 0;
1718 : : }
|