Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : * launcher.c
3 : : * PostgreSQL logical replication worker launcher process
4 : : *
5 : : * Copyright (c) 2016-2025, PostgreSQL Global Development Group
6 : : *
7 : : * IDENTIFICATION
8 : : * src/backend/replication/logical/launcher.c
9 : : *
10 : : * NOTES
11 : : * This module contains the logical replication worker launcher which
12 : : * uses the background worker infrastructure to start the logical
13 : : * replication workers for every enabled subscription.
14 : : *
15 : : *-------------------------------------------------------------------------
16 : : */
17 : :
18 : : #include "postgres.h"
19 : :
20 : : #include "access/heapam.h"
21 : : #include "access/htup.h"
22 : : #include "access/htup_details.h"
23 : : #include "access/tableam.h"
24 : : #include "access/xact.h"
25 : : #include "catalog/pg_subscription.h"
26 : : #include "catalog/pg_subscription_rel.h"
27 : : #include "funcapi.h"
28 : : #include "lib/dshash.h"
29 : : #include "miscadmin.h"
30 : : #include "pgstat.h"
31 : : #include "postmaster/bgworker.h"
32 : : #include "postmaster/interrupt.h"
33 : : #include "replication/logicallauncher.h"
34 : : #include "replication/origin.h"
35 : : #include "replication/slot.h"
36 : : #include "replication/walreceiver.h"
37 : : #include "replication/worker_internal.h"
38 : : #include "storage/ipc.h"
39 : : #include "storage/proc.h"
40 : : #include "storage/procarray.h"
41 : : #include "tcop/tcopprot.h"
42 : : #include "utils/builtins.h"
43 : : #include "utils/memutils.h"
44 : : #include "utils/pg_lsn.h"
45 : : #include "utils/snapmgr.h"
46 : : #include "utils/syscache.h"
47 : :
48 : : /* max sleep time between cycles (3min) */
49 : : #define DEFAULT_NAPTIME_PER_CYCLE 180000L
50 : :
51 : : /* GUC variables */
52 : : int max_logical_replication_workers = 4;
53 : : int max_sync_workers_per_subscription = 2;
54 : : int max_parallel_apply_workers_per_subscription = 2;
55 : :
56 : : LogicalRepWorker *MyLogicalRepWorker = NULL;
57 : :
58 : : typedef struct LogicalRepCtxStruct
59 : : {
60 : : /* Supervisor process. */
61 : : pid_t launcher_pid;
62 : :
63 : : /* Hash table holding last start times of subscriptions' apply workers. */
64 : : dsa_handle last_start_dsa;
65 : : dshash_table_handle last_start_dsh;
66 : :
67 : : /* Background workers. */
68 : : LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER];
69 : : } LogicalRepCtxStruct;
70 : :
71 : : static LogicalRepCtxStruct *LogicalRepCtx;
72 : :
73 : : /* an entry in the last-start-times shared hash table */
74 : : typedef struct LauncherLastStartTimesEntry
75 : : {
76 : : Oid subid; /* OID of logrep subscription (hash key) */
77 : : TimestampTz last_start_time; /* last time its apply worker was started */
78 : : } LauncherLastStartTimesEntry;
79 : :
80 : : /* parameters for the last-start-times shared hash table */
81 : : static const dshash_parameters dsh_params = {
82 : : sizeof(Oid),
83 : : sizeof(LauncherLastStartTimesEntry),
84 : : dshash_memcmp,
85 : : dshash_memhash,
86 : : dshash_memcpy,
87 : : LWTRANCHE_LAUNCHER_HASH
88 : : };
89 : :
90 : : static dsa_area *last_start_times_dsa = NULL;
91 : : static dshash_table *last_start_times = NULL;
92 : :
93 : : static bool on_commit_launcher_wakeup = false;
94 : :
95 : :
96 : : static void logicalrep_launcher_onexit(int code, Datum arg);
97 : : static void logicalrep_worker_onexit(int code, Datum arg);
98 : : static void logicalrep_worker_detach(void);
99 : : static void logicalrep_worker_cleanup(LogicalRepWorker *worker);
100 : : static int logicalrep_pa_worker_count(Oid subid);
101 : : static void logicalrep_launcher_attach_dshmem(void);
102 : : static void ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time);
103 : : static TimestampTz ApplyLauncherGetWorkerStartTime(Oid subid);
104 : : static void compute_min_nonremovable_xid(LogicalRepWorker *worker, TransactionId *xmin);
105 : : static bool acquire_conflict_slot_if_exists(void);
106 : : static void update_conflict_slot_xmin(TransactionId new_xmin);
107 : : static void init_conflict_slot_xmin(void);
108 : :
109 : :
110 : : /*
111 : : * Load the list of subscriptions.
112 : : *
113 : : * Only the fields interesting for worker start/stop functions are filled for
114 : : * each subscription.
115 : : */
116 : : static List *
3204 peter_e@gmx.net 117 :CBC 2716 : get_subscription_list(void)
118 : : {
119 : 2716 : List *res = NIL;
120 : : Relation rel;
121 : : TableScanDesc scan;
122 : : HeapTuple tup;
123 : : MemoryContext resultcxt;
124 : :
125 : : /* This is the context that we will allocate our output data in */
126 : 2716 : resultcxt = CurrentMemoryContext;
127 : :
128 : : /*
129 : : * Start a transaction so we can access pg_subscription.
130 : : */
131 : 2716 : StartTransactionCommand();
132 : :
2472 andres@anarazel.de 133 : 2716 : rel = table_open(SubscriptionRelationId, AccessShareLock);
2423 134 : 2716 : scan = table_beginscan_catalog(rel, 0, NULL);
135 : :
3204 peter_e@gmx.net 136 [ + + ]: 3533 : while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
137 : : {
138 : 817 : Form_pg_subscription subform = (Form_pg_subscription) GETSTRUCT(tup);
139 : : Subscription *sub;
140 : : MemoryContext oldcxt;
141 : :
142 : : /*
143 : : * Allocate our results in the caller's context, not the
144 : : * transaction's. We do this inside the loop, and restore the original
145 : : * context at the end, so that leaky things like heap_getnext() are
146 : : * not called in a potentially long-lived context.
147 : : */
148 : 817 : oldcxt = MemoryContextSwitchTo(resultcxt);
149 : :
3119 150 : 817 : sub = (Subscription *) palloc0(sizeof(Subscription));
2534 andres@anarazel.de 151 : 817 : sub->oid = subform->oid;
3204 peter_e@gmx.net 152 : 817 : sub->dbid = subform->subdbid;
153 : 817 : sub->owner = subform->subowner;
154 : 817 : sub->enabled = subform->subenabled;
155 : 817 : sub->name = pstrdup(NameStr(subform->subname));
97 akapila@postgresql.o 156 :GNC 817 : sub->retaindeadtuples = subform->subretaindeadtuples;
56 157 : 817 : sub->retentionactive = subform->subretentionactive;
158 : : /* We don't fill fields we are not interested in. */
159 : :
3204 peter_e@gmx.net 160 :CBC 817 : res = lappend(res, sub);
161 : 817 : MemoryContextSwitchTo(oldcxt);
162 : : }
163 : :
2423 andres@anarazel.de 164 : 2716 : table_endscan(scan);
2472 165 : 2716 : table_close(rel, AccessShareLock);
166 : :
3204 peter_e@gmx.net 167 : 2716 : CommitTransactionCommand();
168 : :
169 : 2716 : return res;
170 : : }
171 : :
172 : : /*
173 : : * Wait for a background worker to start up and attach to the shmem context.
174 : : *
175 : : * This is only needed for cleaning up the shared memory in case the worker
176 : : * fails to attach.
177 : : *
178 : : * Returns whether the attach was successful.
179 : : */
180 : : static bool
181 : 405 : WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
182 : : uint16 generation,
183 : : BackgroundWorkerHandle *handle)
184 : : {
126 tgl@sss.pgh.pa.us 185 : 405 : bool result = false;
186 : 405 : bool dropped_latch = false;
187 : :
188 : : for (;;)
3204 peter_e@gmx.net 189 : 1107 : {
190 : : BgwHandleStatus status;
191 : : pid_t pid;
192 : : int rc;
193 : :
194 [ + + ]: 1512 : CHECK_FOR_INTERRUPTS();
195 : :
3107 196 : 1511 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
197 : :
198 : : /* Worker either died or has started. Return false if died. */
199 [ + + + + ]: 1511 : if (!worker->in_use || worker->proc)
200 : : {
126 tgl@sss.pgh.pa.us 201 : 402 : result = worker->in_use;
3107 peter_e@gmx.net 202 : 402 : LWLockRelease(LogicalRepWorkerLock);
126 tgl@sss.pgh.pa.us 203 : 402 : break;
204 : : }
205 : :
3107 peter_e@gmx.net 206 : 1109 : LWLockRelease(LogicalRepWorkerLock);
207 : :
208 : : /* Check if worker has died before attaching, and clean up after it. */
3204 209 : 1109 : status = GetBackgroundWorkerPid(handle, &pid);
210 : :
211 [ - + ]: 1109 : if (status == BGWH_STOPPED)
212 : : {
3107 peter_e@gmx.net 213 :UBC 0 : LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
214 : : /* Ensure that this was indeed the worker we waited for. */
215 [ # # ]: 0 : if (generation == worker->generation)
216 : 0 : logicalrep_worker_cleanup(worker);
217 : 0 : LWLockRelease(LogicalRepWorkerLock);
126 tgl@sss.pgh.pa.us 218 : 0 : break; /* result is already false */
219 : : }
220 : :
221 : : /*
222 : : * We need timeout because we generally don't get notified via latch
223 : : * about the worker attach. But we don't expect to have to wait long.
224 : : */
3204 peter_e@gmx.net 225 :CBC 1109 : rc = WaitLatch(MyLatch,
226 : : WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
227 : : 10L, WAIT_EVENT_BGWORKER_STARTUP);
228 : :
3066 andres@anarazel.de 229 [ + + ]: 1109 : if (rc & WL_LATCH_SET)
230 : : {
231 : 458 : ResetLatch(MyLatch);
232 [ + + ]: 458 : CHECK_FOR_INTERRUPTS();
126 tgl@sss.pgh.pa.us 233 : 456 : dropped_latch = true;
234 : : }
235 : : }
236 : :
237 : : /*
238 : : * If we had to clear a latch event in order to wait, be sure to restore
239 : : * it before exiting. Otherwise caller may miss events.
240 : : */
241 [ + - ]: 402 : if (dropped_latch)
242 : 402 : SetLatch(MyLatch);
243 : :
244 : 402 : return result;
245 : : }
246 : :
247 : : /*
248 : : * Walks the workers array and searches for one that matches given
249 : : * subscription id and relid.
250 : : *
251 : : * We are only interested in the leader apply worker or table sync worker.
252 : : */
253 : : LogicalRepWorker *
3141 peter_e@gmx.net 254 : 3125 : logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
255 : : {
256 : : int i;
3086 bruce@momjian.us 257 : 3125 : LogicalRepWorker *res = NULL;
258 : :
3204 peter_e@gmx.net 259 [ - + ]: 3125 : Assert(LWLockHeldByMe(LogicalRepWorkerLock));
260 : :
261 : : /* Search for attached worker for a given subscription id. */
262 [ + + ]: 9602 : for (i = 0; i < max_logical_replication_workers; i++)
263 : : {
3086 bruce@momjian.us 264 : 8385 : LogicalRepWorker *w = &LogicalRepCtx->workers[i];
265 : :
266 : : /* Skip parallel apply workers. */
1023 akapila@postgresql.o 267 [ + + - + ]: 8385 : if (isParallelApplyWorker(w))
1023 akapila@postgresql.o 268 :UBC 0 : continue;
269 : :
3107 peter_e@gmx.net 270 [ + + + + :CBC 8385 : if (w->in_use && w->subid == subid && w->relid == relid &&
+ + ]
271 [ + + + - ]: 1908 : (!only_running || w->proc))
272 : : {
3204 273 : 1908 : res = w;
274 : 1908 : break;
275 : : }
276 : : }
277 : :
278 : 3125 : return res;
279 : : }
280 : :
281 : : /*
282 : : * Similar to logicalrep_worker_find(), but returns a list of all workers for
283 : : * the subscription, instead of just one.
284 : : */
285 : : List *
461 akapila@postgresql.o 286 : 652 : logicalrep_workers_find(Oid subid, bool only_running, bool acquire_lock)
287 : : {
288 : : int i;
3007 peter_e@gmx.net 289 : 652 : List *res = NIL;
290 : :
461 akapila@postgresql.o 291 [ + + ]: 652 : if (acquire_lock)
292 : 118 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
293 : :
3007 peter_e@gmx.net 294 [ - + ]: 652 : Assert(LWLockHeldByMe(LogicalRepWorkerLock));
295 : :
296 : : /* Search for attached worker for a given subscription id. */
297 [ + + ]: 3388 : for (i = 0; i < max_logical_replication_workers; i++)
298 : : {
299 : 2736 : LogicalRepWorker *w = &LogicalRepCtx->workers[i];
300 : :
301 [ + + + + : 2736 : if (w->in_use && w->subid == subid && (!only_running || w->proc))
+ + + + ]
302 : 469 : res = lappend(res, w);
303 : : }
304 : :
461 akapila@postgresql.o 305 [ + + ]: 652 : if (acquire_lock)
306 : 118 : LWLockRelease(LogicalRepWorkerLock);
307 : :
3007 peter_e@gmx.net 308 : 652 : return res;
309 : : }
310 : :
311 : : /*
312 : : * Start new logical replication background worker, if possible.
313 : : *
314 : : * Returns true on success, false on failure.
315 : : */
316 : : bool
806 akapila@postgresql.o 317 : 405 : logicalrep_worker_launch(LogicalRepWorkerType wtype,
318 : : Oid dbid, Oid subid, const char *subname, Oid userid,
319 : : Oid relid, dsm_handle subworker_dsm,
320 : : bool retain_dead_tuples)
321 : : {
322 : : BackgroundWorker bgw;
323 : : BackgroundWorkerHandle *bgw_handle;
324 : : uint16 generation;
325 : : int i;
3086 bruce@momjian.us 326 : 405 : int slot = 0;
327 : 405 : LogicalRepWorker *worker = NULL;
328 : : int nsyncworkers;
329 : : int nparallelapplyworkers;
330 : : TimestampTz now;
806 akapila@postgresql.o 331 : 405 : bool is_tablesync_worker = (wtype == WORKERTYPE_TABLESYNC);
332 : 405 : bool is_parallel_apply_worker = (wtype == WORKERTYPE_PARALLEL_APPLY);
333 : :
334 : : /*----------
335 : : * Sanity checks:
336 : : * - must be valid worker type
337 : : * - tablesync workers are only ones to have relid
338 : : * - parallel apply worker is the only kind of subworker
339 : : * - The replication slot used in conflict detection is created when
340 : : * retain_dead_tuples is enabled
341 : : */
342 [ - + ]: 405 : Assert(wtype != WORKERTYPE_UNKNOWN);
343 [ - + ]: 405 : Assert(is_tablesync_worker == OidIsValid(relid));
344 [ - + ]: 405 : Assert(is_parallel_apply_worker == (subworker_dsm != DSM_HANDLE_INVALID));
97 akapila@postgresql.o 345 [ + + - + ]:GNC 405 : Assert(!retain_dead_tuples || MyReplicationSlot);
346 : :
3079 peter_e@gmx.net 347 [ + + ]:CBC 405 : ereport(DEBUG1,
348 : : (errmsg_internal("starting logical replication worker for subscription \"%s\"",
349 : : subname)));
350 : :
351 : : /* Report this after the initial starting message for consistency. */
221 msawada@postgresql.o 352 [ - + ]: 405 : if (max_active_replication_origins == 0)
3204 peter_e@gmx.net 353 [ # # ]:UBC 0 : ereport(ERROR,
354 : : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
355 : : errmsg("cannot start logical replication workers when \"max_active_replication_origins\" is 0")));
356 : :
357 : : /*
358 : : * We need to do the modification of the shared memory under lock so that
359 : : * we have consistent view.
360 : : */
3204 peter_e@gmx.net 361 :CBC 405 : LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
362 : :
3107 363 : 405 : retry:
364 : : /* Find unused worker slot. */
365 [ + - ]: 699 : for (i = 0; i < max_logical_replication_workers; i++)
366 : : {
367 : 699 : LogicalRepWorker *w = &LogicalRepCtx->workers[i];
368 : :
369 [ + + ]: 699 : if (!w->in_use)
370 : : {
371 : 405 : worker = w;
372 : 405 : slot = i;
3204 373 : 405 : break;
374 : : }
375 : : }
376 : :
3107 377 : 405 : nsyncworkers = logicalrep_sync_worker_count(subid);
378 : :
379 : 405 : now = GetCurrentTimestamp();
380 : :
381 : : /*
382 : : * If we didn't find a free slot, try to do garbage collection. The
383 : : * reason we do this is because if some worker failed to start up and its
384 : : * parent has crashed while waiting, the in_use state was never cleared.
385 : : */
386 [ + - - + ]: 405 : if (worker == NULL || nsyncworkers >= max_sync_workers_per_subscription)
387 : : {
3086 bruce@momjian.us 388 :UBC 0 : bool did_cleanup = false;
389 : :
3107 peter_e@gmx.net 390 [ # # ]: 0 : for (i = 0; i < max_logical_replication_workers; i++)
391 : : {
392 : 0 : LogicalRepWorker *w = &LogicalRepCtx->workers[i];
393 : :
394 : : /*
395 : : * If the worker was marked in use but didn't manage to attach in
396 : : * time, clean it up.
397 : : */
398 [ # # # # : 0 : if (w->in_use && !w->proc &&
# # ]
399 : 0 : TimestampDifferenceExceeds(w->launch_time, now,
400 : : wal_receiver_timeout))
401 : : {
402 [ # # ]: 0 : elog(WARNING,
403 : : "logical replication worker for subscription %u took too long to start; canceled",
404 : : w->subid);
405 : :
406 : 0 : logicalrep_worker_cleanup(w);
407 : 0 : did_cleanup = true;
408 : : }
409 : : }
410 : :
411 [ # # ]: 0 : if (did_cleanup)
412 : 0 : goto retry;
413 : : }
414 : :
415 : : /*
416 : : * We don't allow to invoke more sync workers once we have reached the
417 : : * sync worker limit per subscription. So, just return silently as we
418 : : * might get here because of an otherwise harmless race condition.
419 : : */
806 akapila@postgresql.o 420 [ + + - + ]:CBC 405 : if (is_tablesync_worker && nsyncworkers >= max_sync_workers_per_subscription)
421 : : {
3107 peter_e@gmx.net 422 :UBC 0 : LWLockRelease(LogicalRepWorkerLock);
1023 akapila@postgresql.o 423 : 0 : return false;
424 : : }
425 : :
1023 akapila@postgresql.o 426 :CBC 405 : nparallelapplyworkers = logicalrep_pa_worker_count(subid);
427 : :
428 : : /*
429 : : * Return false if the number of parallel apply workers reached the limit
430 : : * per subscription.
431 : : */
432 [ + + ]: 405 : if (is_parallel_apply_worker &&
433 [ - + ]: 10 : nparallelapplyworkers >= max_parallel_apply_workers_per_subscription)
434 : : {
1023 akapila@postgresql.o 435 :UBC 0 : LWLockRelease(LogicalRepWorkerLock);
436 : 0 : return false;
437 : : }
438 : :
439 : : /*
440 : : * However if there are no more free worker slots, inform user about it
441 : : * before exiting.
442 : : */
3204 peter_e@gmx.net 443 [ - + ]:CBC 405 : if (worker == NULL)
444 : : {
3199 fujii@postgresql.org 445 :UBC 0 : LWLockRelease(LogicalRepWorkerLock);
3204 peter_e@gmx.net 446 [ # # ]: 0 : ereport(WARNING,
447 : : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
448 : : errmsg("out of logical replication worker slots"),
449 : : errhint("You might need to increase \"%s\".", "max_logical_replication_workers")));
1023 akapila@postgresql.o 450 : 0 : return false;
451 : : }
452 : :
453 : : /* Prepare the worker slot. */
806 akapila@postgresql.o 454 :CBC 405 : worker->type = wtype;
3107 peter_e@gmx.net 455 : 405 : worker->launch_time = now;
456 : 405 : worker->in_use = true;
457 : 405 : worker->generation++;
3141 458 : 405 : worker->proc = NULL;
3204 459 : 405 : worker->dbid = dbid;
460 : 405 : worker->userid = userid;
461 : 405 : worker->subid = subid;
3141 462 : 405 : worker->relid = relid;
463 : 405 : worker->relstate = SUBREL_STATE_UNKNOWN;
464 : 405 : worker->relstate_lsn = InvalidXLogRecPtr;
1517 akapila@postgresql.o 465 : 405 : worker->stream_fileset = NULL;
1014 466 [ + + ]: 405 : worker->leader_pid = is_parallel_apply_worker ? MyProcPid : InvalidPid;
1023 467 : 405 : worker->parallel_apply = is_parallel_apply_worker;
97 akapila@postgresql.o 468 :GNC 405 : worker->oldest_nonremovable_xid = retain_dead_tuples
469 : 1 : ? MyReplicationSlot->data.xmin
470 [ + + ]: 405 : : InvalidTransactionId;
3141 peter_e@gmx.net 471 :CBC 405 : worker->last_lsn = InvalidXLogRecPtr;
472 : 405 : TIMESTAMP_NOBEGIN(worker->last_send_time);
473 : 405 : TIMESTAMP_NOBEGIN(worker->last_recv_time);
474 : 405 : worker->reply_lsn = InvalidXLogRecPtr;
475 : 405 : TIMESTAMP_NOBEGIN(worker->reply_time);
476 : :
477 : : /* Before releasing lock, remember generation for future identification. */
2962 tgl@sss.pgh.pa.us 478 : 405 : generation = worker->generation;
479 : :
3204 peter_e@gmx.net 480 : 405 : LWLockRelease(LogicalRepWorkerLock);
481 : :
482 : : /* Register the new dynamic worker. */
3117 tgl@sss.pgh.pa.us 483 : 405 : memset(&bgw, 0, sizeof(bgw));
3086 bruce@momjian.us 484 : 405 : bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
485 : : BGWORKER_BACKEND_DATABASE_CONNECTION;
3204 peter_e@gmx.net 486 : 405 : bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
848 nathan@postgresql.or 487 : 405 : snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres");
488 : :
798 akapila@postgresql.o 489 [ + + + - : 405 : switch (worker->type)
- ]
490 : : {
491 : 199 : case WORKERTYPE_APPLY:
492 : 199 : snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain");
493 : 199 : snprintf(bgw.bgw_name, BGW_MAXLEN,
494 : : "logical replication apply worker for subscription %u",
495 : : subid);
496 : 199 : snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication apply worker");
497 : 199 : break;
498 : :
499 : 10 : case WORKERTYPE_PARALLEL_APPLY:
500 : 10 : snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ParallelApplyWorkerMain");
501 : 10 : snprintf(bgw.bgw_name, BGW_MAXLEN,
502 : : "logical replication parallel apply worker for subscription %u",
503 : : subid);
504 : 10 : snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication parallel worker");
505 : :
506 : 10 : memcpy(bgw.bgw_extra, &subworker_dsm, sizeof(dsm_handle));
507 : 10 : break;
508 : :
509 : 196 : case WORKERTYPE_TABLESYNC:
510 : 196 : snprintf(bgw.bgw_function_name, BGW_MAXLEN, "TablesyncWorkerMain");
511 : 196 : snprintf(bgw.bgw_name, BGW_MAXLEN,
512 : : "logical replication tablesync worker for subscription %u sync %u",
513 : : subid,
514 : : relid);
515 : 196 : snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication tablesync worker");
516 : 196 : break;
517 : :
798 akapila@postgresql.o 518 :UBC 0 : case WORKERTYPE_UNKNOWN:
519 : : /* Should never happen. */
520 [ # # ]: 0 : elog(ERROR, "unknown worker type");
521 : : }
522 : :
3204 peter_e@gmx.net 523 :CBC 405 : bgw.bgw_restart_time = BGW_NEVER_RESTART;
524 : 405 : bgw.bgw_notify_pid = MyProcPid;
3114 fujii@postgresql.org 525 : 405 : bgw.bgw_main_arg = Int32GetDatum(slot);
526 : :
3204 peter_e@gmx.net 527 [ - + ]: 405 : if (!RegisterDynamicBackgroundWorker(&bgw, &bgw_handle))
528 : : {
529 : : /* Failed to start worker, so clean up the worker slot. */
2962 tgl@sss.pgh.pa.us 530 :UBC 0 : LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
531 [ # # ]: 0 : Assert(generation == worker->generation);
532 : 0 : logicalrep_worker_cleanup(worker);
533 : 0 : LWLockRelease(LogicalRepWorkerLock);
534 : :
3204 peter_e@gmx.net 535 [ # # ]: 0 : ereport(WARNING,
536 : : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
537 : : errmsg("out of background worker slots"),
538 : : errhint("You might need to increase \"%s\".", "max_worker_processes")));
1023 akapila@postgresql.o 539 : 0 : return false;
540 : : }
541 : :
542 : : /* Now wait until it attaches. */
1023 akapila@postgresql.o 543 :CBC 405 : return WaitForReplicationWorkerAttach(worker, generation, bgw_handle);
544 : : }
545 : :
546 : : /*
547 : : * Internal function to stop the worker and wait until it detaches from the
548 : : * slot.
549 : : */
550 : : static void
551 : 81 : logicalrep_worker_stop_internal(LogicalRepWorker *worker, int signo)
552 : : {
553 : : uint16 generation;
554 : :
555 [ - + ]: 81 : Assert(LWLockHeldByMeInMode(LogicalRepWorkerLock, LW_SHARED));
556 : :
557 : : /*
558 : : * Remember which generation was our worker so we can check if what we see
559 : : * is still the same one.
560 : : */
3107 peter_e@gmx.net 561 : 81 : generation = worker->generation;
562 : :
563 : : /*
564 : : * If we found a worker but it does not have proc set then it is still
565 : : * starting up; wait for it to finish starting and then kill it.
566 : : */
567 [ + - + + ]: 81 : while (worker->in_use && !worker->proc)
568 : : {
569 : : int rc;
570 : :
3204 571 : 1 : LWLockRelease(LogicalRepWorkerLock);
572 : :
573 : : /* Wait a bit --- we don't expect to have to wait long. */
3066 andres@anarazel.de 574 : 1 : rc = WaitLatch(MyLatch,
575 : : WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
576 : : 10L, WAIT_EVENT_BGWORKER_STARTUP);
577 : :
578 [ - + ]: 1 : if (rc & WL_LATCH_SET)
579 : : {
3066 andres@anarazel.de 580 :UBC 0 : ResetLatch(MyLatch);
581 [ # # ]: 0 : CHECK_FOR_INTERRUPTS();
582 : : }
583 : :
584 : : /* Recheck worker status. */
3204 peter_e@gmx.net 585 :CBC 1 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
586 : :
587 : : /*
588 : : * Check whether the worker slot is no longer used, which would mean
589 : : * that the worker has exited, or whether the worker generation is
590 : : * different, meaning that a different worker has taken the slot.
591 : : */
3107 592 [ + - - + ]: 1 : if (!worker->in_use || worker->generation != generation)
3200 peter_e@gmx.net 593 :UBC 0 : return;
594 : :
595 : : /* Worker has assigned proc, so it has started. */
3200 peter_e@gmx.net 596 [ + - ]:CBC 1 : if (worker->proc)
3204 597 : 1 : break;
598 : : }
599 : :
600 : : /* Now terminate the worker ... */
1023 akapila@postgresql.o 601 : 81 : kill(worker->proc->pid, signo);
602 : :
603 : : /* ... and wait for it to die. */
604 : : for (;;)
3204 peter_e@gmx.net 605 : 105 : {
606 : : int rc;
607 : :
608 : : /* is it gone? */
3107 609 [ + + + + ]: 186 : if (!worker->proc || worker->generation != generation)
610 : : break;
611 : :
3041 tgl@sss.pgh.pa.us 612 : 105 : LWLockRelease(LogicalRepWorkerLock);
613 : :
614 : : /* Wait a bit --- we don't expect to have to wait long. */
3066 andres@anarazel.de 615 : 105 : rc = WaitLatch(MyLatch,
616 : : WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
617 : : 10L, WAIT_EVENT_BGWORKER_SHUTDOWN);
618 : :
619 [ + + ]: 105 : if (rc & WL_LATCH_SET)
620 : : {
621 : 29 : ResetLatch(MyLatch);
622 [ + + ]: 29 : CHECK_FOR_INTERRUPTS();
623 : : }
624 : :
3041 tgl@sss.pgh.pa.us 625 : 105 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
626 : : }
627 : : }
628 : :
629 : : /*
630 : : * Stop the logical replication worker for subid/relid, if any.
631 : : */
632 : : void
1023 akapila@postgresql.o 633 : 96 : logicalrep_worker_stop(Oid subid, Oid relid)
634 : : {
635 : : LogicalRepWorker *worker;
636 : :
637 : 96 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
638 : :
639 : 96 : worker = logicalrep_worker_find(subid, relid, false);
640 : :
641 [ + + ]: 96 : if (worker)
642 : : {
643 [ + - - + ]: 74 : Assert(!isParallelApplyWorker(worker));
644 : 74 : logicalrep_worker_stop_internal(worker, SIGTERM);
645 : : }
646 : :
647 : 96 : LWLockRelease(LogicalRepWorkerLock);
648 : 96 : }
649 : :
650 : : /*
651 : : * Stop the given logical replication parallel apply worker.
652 : : *
653 : : * Node that the function sends SIGUSR2 instead of SIGTERM to the parallel apply
654 : : * worker so that the worker exits cleanly.
655 : : */
656 : : void
903 657 : 5 : logicalrep_pa_worker_stop(ParallelApplyWorkerInfo *winfo)
658 : : {
659 : : int slot_no;
660 : : uint16 generation;
661 : : LogicalRepWorker *worker;
662 : :
663 [ - + ]: 5 : SpinLockAcquire(&winfo->shared->mutex);
664 : 5 : generation = winfo->shared->logicalrep_worker_generation;
665 : 5 : slot_no = winfo->shared->logicalrep_worker_slot_no;
666 : 5 : SpinLockRelease(&winfo->shared->mutex);
667 : :
1023 668 [ + - - + ]: 5 : Assert(slot_no >= 0 && slot_no < max_logical_replication_workers);
669 : :
670 : : /*
671 : : * Detach from the error_mq_handle for the parallel apply worker before
672 : : * stopping it. This prevents the leader apply worker from trying to
673 : : * receive the message from the error queue that might already be detached
674 : : * by the parallel apply worker.
675 : : */
903 676 [ + - ]: 5 : if (winfo->error_mq_handle)
677 : : {
678 : 5 : shm_mq_detach(winfo->error_mq_handle);
679 : 5 : winfo->error_mq_handle = NULL;
680 : : }
681 : :
1023 682 : 5 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
683 : :
684 : 5 : worker = &LogicalRepCtx->workers[slot_no];
685 [ + - - + ]: 5 : Assert(isParallelApplyWorker(worker));
686 : :
687 : : /*
688 : : * Only stop the worker if the generation matches and the worker is alive.
689 : : */
690 [ + - + - ]: 5 : if (worker->generation == generation && worker->proc)
34 691 : 5 : logicalrep_worker_stop_internal(worker, SIGUSR2);
692 : :
3041 tgl@sss.pgh.pa.us 693 : 5 : LWLockRelease(LogicalRepWorkerLock);
3204 peter_e@gmx.net 694 : 5 : }
695 : :
696 : : /*
697 : : * Wake up (using latch) any logical replication worker for specified sub/rel.
698 : : */
699 : : void
3141 700 : 211 : logicalrep_worker_wakeup(Oid subid, Oid relid)
701 : : {
702 : : LogicalRepWorker *worker;
703 : :
704 : 211 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
705 : :
706 : 211 : worker = logicalrep_worker_find(subid, relid, true);
707 : :
708 [ + - ]: 211 : if (worker)
709 : 211 : logicalrep_worker_wakeup_ptr(worker);
710 : :
3042 tgl@sss.pgh.pa.us 711 : 211 : LWLockRelease(LogicalRepWorkerLock);
3141 peter_e@gmx.net 712 : 211 : }
713 : :
714 : : /*
715 : : * Wake up (using latch) the specified logical replication worker.
716 : : *
717 : : * Caller must hold lock, else worker->proc could change under us.
718 : : */
719 : : void
720 : 646 : logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker)
721 : : {
3042 tgl@sss.pgh.pa.us 722 [ - + ]: 646 : Assert(LWLockHeldByMe(LogicalRepWorkerLock));
723 : :
3141 peter_e@gmx.net 724 : 646 : SetLatch(&worker->proc->procLatch);
725 : 646 : }
726 : :
727 : : /*
728 : : * Attach to a slot.
729 : : */
730 : : void
3204 731 : 529 : logicalrep_worker_attach(int slot)
732 : : {
733 : : /* Block concurrent access. */
734 : 529 : LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
735 : :
736 [ + - - + ]: 529 : Assert(slot >= 0 && slot < max_logical_replication_workers);
737 : 529 : MyLogicalRepWorker = &LogicalRepCtx->workers[slot];
738 : :
3107 739 [ - + ]: 529 : if (!MyLogicalRepWorker->in_use)
740 : : {
3107 peter_e@gmx.net 741 :UBC 0 : LWLockRelease(LogicalRepWorkerLock);
742 [ # # ]: 0 : ereport(ERROR,
743 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
744 : : errmsg("logical replication worker slot %d is empty, cannot attach",
745 : : slot)));
746 : : }
747 : :
3204 peter_e@gmx.net 748 [ - + ]:CBC 529 : if (MyLogicalRepWorker->proc)
749 : : {
3107 peter_e@gmx.net 750 :UBC 0 : LWLockRelease(LogicalRepWorkerLock);
3204 751 [ # # ]: 0 : ereport(ERROR,
752 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
753 : : errmsg("logical replication worker slot %d is already used by "
754 : : "another worker, cannot attach", slot)));
755 : : }
756 : :
3204 peter_e@gmx.net 757 :CBC 529 : MyLogicalRepWorker->proc = MyProc;
758 : 529 : before_shmem_exit(logicalrep_worker_onexit, (Datum) 0);
759 : :
760 : 529 : LWLockRelease(LogicalRepWorkerLock);
761 : 529 : }
762 : :
763 : : /*
764 : : * Stop the parallel apply workers if any, and detach the leader apply worker
765 : : * (cleans up the worker info).
766 : : */
767 : : static void
768 : 529 : logicalrep_worker_detach(void)
769 : : {
770 : : /* Stop the parallel apply workers. */
1023 akapila@postgresql.o 771 [ + + ]: 529 : if (am_leader_apply_worker())
772 : : {
773 : : List *workers;
774 : : ListCell *lc;
775 : :
776 : : /*
777 : : * Detach from the error_mq_handle for all parallel apply workers
778 : : * before terminating them. This prevents the leader apply worker from
779 : : * receiving the worker termination message and sending it to logs
780 : : * when the same is already done by the parallel worker.
781 : : */
782 : 321 : pa_detach_all_error_mq();
783 : :
784 : 321 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
785 : :
461 786 : 321 : workers = logicalrep_workers_find(MyLogicalRepWorker->subid, true, false);
1023 787 [ + - + + : 646 : foreach(lc, workers)
+ + ]
788 : : {
789 : 325 : LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
790 : :
791 [ + - + + ]: 325 : if (isParallelApplyWorker(w))
792 : 2 : logicalrep_worker_stop_internal(w, SIGTERM);
793 : : }
794 : :
795 : 321 : LWLockRelease(LogicalRepWorkerLock);
796 : :
87 tgl@sss.pgh.pa.us 797 :GNC 321 : list_free(workers);
798 : : }
799 : :
800 : : /* Block concurrent access. */
3204 peter_e@gmx.net 801 :CBC 529 : LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
802 : :
3107 803 : 529 : logicalrep_worker_cleanup(MyLogicalRepWorker);
804 : :
3204 805 : 529 : LWLockRelease(LogicalRepWorkerLock);
806 : 529 : }
807 : :
808 : : /*
809 : : * Clean up worker info.
810 : : */
811 : : static void
3107 812 : 529 : logicalrep_worker_cleanup(LogicalRepWorker *worker)
813 : : {
814 [ - + ]: 529 : Assert(LWLockHeldByMeInMode(LogicalRepWorkerLock, LW_EXCLUSIVE));
815 : :
795 akapila@postgresql.o 816 : 529 : worker->type = WORKERTYPE_UNKNOWN;
3107 peter_e@gmx.net 817 : 529 : worker->in_use = false;
818 : 529 : worker->proc = NULL;
819 : 529 : worker->dbid = InvalidOid;
820 : 529 : worker->userid = InvalidOid;
821 : 529 : worker->subid = InvalidOid;
822 : 529 : worker->relid = InvalidOid;
1014 akapila@postgresql.o 823 : 529 : worker->leader_pid = InvalidPid;
1023 824 : 529 : worker->parallel_apply = false;
3107 peter_e@gmx.net 825 : 529 : }
826 : :
827 : : /*
828 : : * Cleanup function for logical replication launcher.
829 : : *
830 : : * Called on logical replication launcher exit.
831 : : */
832 : : static void
3114 fujii@postgresql.org 833 : 405 : logicalrep_launcher_onexit(int code, Datum arg)
834 : : {
835 : 405 : LogicalRepCtx->launcher_pid = 0;
836 : 405 : }
837 : :
838 : : /*
839 : : * Cleanup function.
840 : : *
841 : : * Called on logical replication worker exit.
842 : : */
843 : : static void
3204 peter_e@gmx.net 844 : 529 : logicalrep_worker_onexit(int code, Datum arg)
845 : : {
846 : : /* Disconnect gracefully from the remote side. */
1630 alvherre@alvh.no-ip. 847 [ + + ]: 529 : if (LogRepWorkerWalRcvConn)
848 : 415 : walrcv_disconnect(LogRepWorkerWalRcvConn);
849 : :
3204 peter_e@gmx.net 850 : 529 : logicalrep_worker_detach();
851 : :
852 : : /* Cleanup fileset used for streaming transactions. */
1517 akapila@postgresql.o 853 [ + + ]: 529 : if (MyLogicalRepWorker->stream_fileset != NULL)
854 : 14 : FileSetDeleteAll(MyLogicalRepWorker->stream_fileset);
855 : :
856 : : /*
857 : : * Session level locks may be acquired outside of a transaction in
858 : : * parallel apply mode and will not be released when the worker
859 : : * terminates, so manually release all locks before the worker exits.
860 : : *
861 : : * The locks will be acquired once the worker is initialized.
862 : : */
909 863 [ + + ]: 529 : if (!InitializingApplyWorker)
864 : 467 : LockReleaseAll(DEFAULT_LOCKMETHOD, true);
865 : :
3071 peter_e@gmx.net 866 : 529 : ApplyLauncherWakeup();
3204 867 : 529 : }
868 : :
869 : : /*
870 : : * Count the number of registered (not necessarily running) sync workers
871 : : * for a subscription.
872 : : */
873 : : int
3141 874 : 1310 : logicalrep_sync_worker_count(Oid subid)
875 : : {
876 : : int i;
3086 bruce@momjian.us 877 : 1310 : int res = 0;
878 : :
3141 peter_e@gmx.net 879 [ - + ]: 1310 : Assert(LWLockHeldByMe(LogicalRepWorkerLock));
880 : :
881 : : /* Search for attached worker for a given subscription id. */
882 [ + + ]: 6756 : for (i = 0; i < max_logical_replication_workers; i++)
883 : : {
3086 bruce@momjian.us 884 : 5446 : LogicalRepWorker *w = &LogicalRepCtx->workers[i];
885 : :
795 akapila@postgresql.o 886 [ + + + + : 5446 : if (isTablesyncWorker(w) && w->subid == subid)
+ + ]
3141 peter_e@gmx.net 887 : 1535 : res++;
888 : : }
889 : :
890 : 1310 : return res;
891 : : }
892 : :
893 : : /*
894 : : * Count the number of registered (but not necessarily running) parallel apply
895 : : * workers for a subscription.
896 : : */
897 : : static int
1023 akapila@postgresql.o 898 : 405 : logicalrep_pa_worker_count(Oid subid)
899 : : {
900 : : int i;
901 : 405 : int res = 0;
902 : :
903 [ - + ]: 405 : Assert(LWLockHeldByMe(LogicalRepWorkerLock));
904 : :
905 : : /*
906 : : * Scan all attached parallel apply workers, only counting those which
907 : : * have the given subscription id.
908 : : */
909 [ + + ]: 2149 : for (i = 0; i < max_logical_replication_workers; i++)
910 : : {
911 : 1744 : LogicalRepWorker *w = &LogicalRepCtx->workers[i];
912 : :
795 913 [ + + + + : 1744 : if (isParallelApplyWorker(w) && w->subid == subid)
+ - ]
1023 914 : 2 : res++;
915 : : }
916 : :
917 : 405 : return res;
918 : : }
919 : :
920 : : /*
921 : : * ApplyLauncherShmemSize
922 : : * Compute space needed for replication launcher shared memory
923 : : */
924 : : Size
3204 peter_e@gmx.net 925 : 4047 : ApplyLauncherShmemSize(void)
926 : : {
927 : : Size size;
928 : :
929 : : /*
930 : : * Need the fixed struct and the array of LogicalRepWorker.
931 : : */
932 : 4047 : size = sizeof(LogicalRepCtxStruct);
933 : 4047 : size = MAXALIGN(size);
934 : 4047 : size = add_size(size, mul_size(max_logical_replication_workers,
935 : : sizeof(LogicalRepWorker)));
936 : 4047 : return size;
937 : : }
938 : :
939 : : /*
940 : : * ApplyLauncherRegister
941 : : * Register a background worker running the logical replication launcher.
942 : : */
943 : : void
944 : 833 : ApplyLauncherRegister(void)
945 : : {
946 : : BackgroundWorker bgw;
947 : :
948 : : /*
949 : : * The logical replication launcher is disabled during binary upgrades, to
950 : : * prevent logical replication workers from running on the source cluster.
951 : : * That could cause replication origins to move forward after having been
952 : : * copied to the target cluster, potentially creating conflicts with the
953 : : * copied data files.
954 : : */
655 michael@paquier.xyz 955 [ + + + + ]: 833 : if (max_logical_replication_workers == 0 || IsBinaryUpgrade)
3204 peter_e@gmx.net 956 : 53 : return;
957 : :
3117 tgl@sss.pgh.pa.us 958 : 780 : memset(&bgw, 0, sizeof(bgw));
3086 bruce@momjian.us 959 : 780 : bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
960 : : BGWORKER_BACKEND_DATABASE_CONNECTION;
3204 peter_e@gmx.net 961 : 780 : bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
848 nathan@postgresql.or 962 : 780 : snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres");
3133 rhaas@postgresql.org 963 : 780 : snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyLauncherMain");
3204 peter_e@gmx.net 964 : 780 : snprintf(bgw.bgw_name, BGW_MAXLEN,
965 : : "logical replication launcher");
2980 966 : 780 : snprintf(bgw.bgw_type, BGW_MAXLEN,
967 : : "logical replication launcher");
3204 968 : 780 : bgw.bgw_restart_time = 5;
969 : 780 : bgw.bgw_notify_pid = 0;
970 : 780 : bgw.bgw_main_arg = (Datum) 0;
971 : :
972 : 780 : RegisterBackgroundWorker(&bgw);
973 : : }
974 : :
975 : : /*
976 : : * ApplyLauncherShmemInit
977 : : * Allocate and initialize replication launcher shared memory
978 : : */
979 : : void
980 : 1049 : ApplyLauncherShmemInit(void)
981 : : {
982 : : bool found;
983 : :
984 : 1049 : LogicalRepCtx = (LogicalRepCtxStruct *)
985 : 1049 : ShmemInitStruct("Logical Replication Launcher Data",
986 : : ApplyLauncherShmemSize(),
987 : : &found);
988 : :
989 [ + - ]: 1049 : if (!found)
990 : : {
991 : : int slot;
992 : :
993 : 1049 : memset(LogicalRepCtx, 0, ApplyLauncherShmemSize());
994 : :
1007 tgl@sss.pgh.pa.us 995 : 1049 : LogicalRepCtx->last_start_dsa = DSA_HANDLE_INVALID;
996 : 1049 : LogicalRepCtx->last_start_dsh = DSHASH_HANDLE_INVALID;
997 : :
998 : : /* Initialize memory and spin locks for each worker slot. */
3141 peter_e@gmx.net 999 [ + + ]: 5208 : for (slot = 0; slot < max_logical_replication_workers; slot++)
1000 : : {
1001 : 4159 : LogicalRepWorker *worker = &LogicalRepCtx->workers[slot];
1002 : :
1003 : 4159 : memset(worker, 0, sizeof(LogicalRepWorker));
1004 : 4159 : SpinLockInit(&worker->relmutex);
1005 : : }
1006 : : }
3204 1007 : 1049 : }
1008 : :
1009 : : /*
1010 : : * Initialize or attach to the dynamic shared hash table that stores the
1011 : : * last-start times, if not already done.
1012 : : * This must be called before accessing the table.
1013 : : */
1014 : : static void
1010 tgl@sss.pgh.pa.us 1015 : 714 : logicalrep_launcher_attach_dshmem(void)
1016 : : {
1017 : : MemoryContext oldcontext;
1018 : :
1019 : : /* Quick exit if we already did this. */
1007 1020 [ + + ]: 714 : if (LogicalRepCtx->last_start_dsh != DSHASH_HANDLE_INVALID &&
1010 1021 [ + + ]: 661 : last_start_times != NULL)
1022 : 475 : return;
1023 : :
1024 : : /* Otherwise, use a lock to ensure only one process creates the table. */
1025 : 239 : LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
1026 : :
1027 : : /* Be sure any local memory allocated by DSA routines is persistent. */
1028 : 239 : oldcontext = MemoryContextSwitchTo(TopMemoryContext);
1029 : :
1007 1030 [ + + ]: 239 : if (LogicalRepCtx->last_start_dsh == DSHASH_HANDLE_INVALID)
1031 : : {
1032 : : /* Initialize dynamic shared hash table for last-start times. */
1010 1033 : 53 : last_start_times_dsa = dsa_create(LWTRANCHE_LAUNCHER_DSA);
1034 : 53 : dsa_pin(last_start_times_dsa);
1035 : 53 : dsa_pin_mapping(last_start_times_dsa);
610 nathan@postgresql.or 1036 : 53 : last_start_times = dshash_create(last_start_times_dsa, &dsh_params, NULL);
1037 : :
1038 : : /* Store handles in shared memory for other backends to use. */
1010 tgl@sss.pgh.pa.us 1039 : 53 : LogicalRepCtx->last_start_dsa = dsa_get_handle(last_start_times_dsa);
1040 : 53 : LogicalRepCtx->last_start_dsh = dshash_get_hash_table_handle(last_start_times);
1041 : : }
1042 [ + - ]: 186 : else if (!last_start_times)
1043 : : {
1044 : : /* Attach to existing dynamic shared hash table. */
1045 : 186 : last_start_times_dsa = dsa_attach(LogicalRepCtx->last_start_dsa);
1046 : 186 : dsa_pin_mapping(last_start_times_dsa);
1047 : 186 : last_start_times = dshash_attach(last_start_times_dsa, &dsh_params,
144 nathan@postgresql.or 1048 : 186 : LogicalRepCtx->last_start_dsh, NULL);
1049 : : }
1050 : :
1010 tgl@sss.pgh.pa.us 1051 : 239 : MemoryContextSwitchTo(oldcontext);
1052 : 239 : LWLockRelease(LogicalRepWorkerLock);
1053 : : }
1054 : :
1055 : : /*
1056 : : * Set the last-start time for the subscription.
1057 : : */
1058 : : static void
1059 : 199 : ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time)
1060 : : {
1061 : : LauncherLastStartTimesEntry *entry;
1062 : : bool found;
1063 : :
1064 : 199 : logicalrep_launcher_attach_dshmem();
1065 : :
1066 : 199 : entry = dshash_find_or_insert(last_start_times, &subid, &found);
1067 : 199 : entry->last_start_time = start_time;
1068 : 199 : dshash_release_lock(last_start_times, entry);
1069 : 199 : }
1070 : :
1071 : : /*
1072 : : * Return the last-start time for the subscription, or 0 if there isn't one.
1073 : : */
1074 : : static TimestampTz
1075 : 290 : ApplyLauncherGetWorkerStartTime(Oid subid)
1076 : : {
1077 : : LauncherLastStartTimesEntry *entry;
1078 : : TimestampTz ret;
1079 : :
1080 : 290 : logicalrep_launcher_attach_dshmem();
1081 : :
1082 : 290 : entry = dshash_find(last_start_times, &subid, false);
1083 [ + + ]: 290 : if (entry == NULL)
1084 : 124 : return 0;
1085 : :
1086 : 166 : ret = entry->last_start_time;
1087 : 166 : dshash_release_lock(last_start_times, entry);
1088 : :
1089 : 166 : return ret;
1090 : : }
1091 : :
1092 : : /*
1093 : : * Remove the last-start-time entry for the subscription, if one exists.
1094 : : *
1095 : : * This has two use-cases: to remove the entry related to a subscription
1096 : : * that's been deleted or disabled (just to avoid leaking shared memory),
1097 : : * and to allow immediate restart of an apply worker that has exited
1098 : : * due to subscription parameter changes.
1099 : : */
1100 : : void
1101 : 225 : ApplyLauncherForgetWorkerStartTime(Oid subid)
1102 : : {
1103 : 225 : logicalrep_launcher_attach_dshmem();
1104 : :
1105 : 225 : (void) dshash_delete_key(last_start_times, &subid);
1106 : 225 : }
1107 : :
1108 : : /*
1109 : : * Wakeup the launcher on commit if requested.
1110 : : */
1111 : : void
3102 peter_e@gmx.net 1112 : 321814 : AtEOXact_ApplyLauncher(bool isCommit)
1113 : : {
3007 1114 [ + + ]: 321814 : if (isCommit)
1115 : : {
1116 [ + + ]: 296228 : if (on_commit_launcher_wakeup)
1117 : 142 : ApplyLauncherWakeup();
1118 : : }
1119 : :
3102 1120 : 321814 : on_commit_launcher_wakeup = false;
3204 1121 : 321814 : }
1122 : :
1123 : : /*
1124 : : * Request wakeup of the launcher on commit of the transaction.
1125 : : *
1126 : : * This is used to send launcher signal to stop sleeping and process the
1127 : : * subscriptions when current transaction commits. Should be used when new
1128 : : * tuple was added to the pg_subscription catalog.
1129 : : */
1130 : : void
1131 : 143 : ApplyLauncherWakeupAtCommit(void)
1132 : : {
3186 heikki.linnakangas@i 1133 [ + + ]: 143 : if (!on_commit_launcher_wakeup)
1134 : 142 : on_commit_launcher_wakeup = true;
3204 peter_e@gmx.net 1135 : 143 : }
1136 : :
1137 : : /*
1138 : : * Wakeup the launcher immediately.
1139 : : */
1140 : : void
1141 : 705 : ApplyLauncherWakeup(void)
1142 : : {
3114 fujii@postgresql.org 1143 [ + + ]: 705 : if (LogicalRepCtx->launcher_pid != 0)
3204 peter_e@gmx.net 1144 : 681 : kill(LogicalRepCtx->launcher_pid, SIGUSR1);
1145 : 705 : }
1146 : :
1147 : : /*
1148 : : * Main loop for the apply launcher process.
1149 : : */
1150 : : void
1151 : 405 : ApplyLauncherMain(Datum main_arg)
1152 : : {
3154 tgl@sss.pgh.pa.us 1153 [ + + ]: 405 : ereport(DEBUG1,
1154 : : (errmsg_internal("logical replication launcher started")));
1155 : :
3114 fujii@postgresql.org 1156 : 405 : before_shmem_exit(logicalrep_launcher_onexit, (Datum) 0);
1157 : :
3064 andres@anarazel.de 1158 [ - + ]: 405 : Assert(LogicalRepCtx->launcher_pid == 0);
1159 : 405 : LogicalRepCtx->launcher_pid = MyProcPid;
1160 : :
1161 : : /* Establish signal handlers. */
1929 akapila@postgresql.o 1162 : 405 : pqsignal(SIGHUP, SignalHandlerForConfigReload);
3064 andres@anarazel.de 1163 : 405 : pqsignal(SIGTERM, die);
3204 peter_e@gmx.net 1164 : 405 : BackgroundWorkerUnblockSignals();
1165 : :
1166 : : /*
1167 : : * Establish connection to nailed catalogs (we only ever access
1168 : : * pg_subscription).
1169 : : */
2763 magnus@hagander.net 1170 : 405 : BackgroundWorkerInitializeConnection(NULL, NULL, 0);
1171 : :
1172 : : /*
1173 : : * Acquire the conflict detection slot at startup to ensure it can be
1174 : : * dropped if no longer needed after a restart.
1175 : : */
97 akapila@postgresql.o 1176 :GNC 405 : acquire_conflict_slot_if_exists();
1177 : :
1178 : : /* Enter main loop */
1179 : : for (;;)
3204 peter_e@gmx.net 1180 :CBC 2312 : {
1181 : : int rc;
1182 : : List *sublist;
1183 : : ListCell *lc;
1184 : : MemoryContext subctx;
1185 : : MemoryContext oldctx;
3086 bruce@momjian.us 1186 : 2717 : long wait_time = DEFAULT_NAPTIME_PER_CYCLE;
56 akapila@postgresql.o 1187 :GNC 2717 : bool can_update_xmin = true;
97 1188 : 2717 : bool retain_dead_tuples = false;
1189 : 2717 : TransactionId xmin = InvalidTransactionId;
1190 : :
3064 andres@anarazel.de 1191 [ + + ]:CBC 2717 : CHECK_FOR_INTERRUPTS();
1192 : :
1193 : : /* Use temporary context to avoid leaking memory across cycles. */
1010 tgl@sss.pgh.pa.us 1194 : 2716 : subctx = AllocSetContextCreate(TopMemoryContext,
1195 : : "Logical Replication Launcher sublist",
1196 : : ALLOCSET_DEFAULT_SIZES);
1197 : 2716 : oldctx = MemoryContextSwitchTo(subctx);
1198 : :
1199 : : /*
1200 : : * Start any missing workers for enabled subscriptions.
1201 : : *
1202 : : * Also, during the iteration through all subscriptions, we compute
1203 : : * the minimum XID required to protect deleted tuples for conflict
1204 : : * detection if one of the subscription enables retain_dead_tuples
1205 : : * option.
1206 : : */
1207 : 2716 : sublist = get_subscription_list();
1208 [ + + + + : 3532 : foreach(lc, sublist)
+ + ]
1209 : : {
1210 : 817 : Subscription *sub = (Subscription *) lfirst(lc);
1211 : : LogicalRepWorker *w;
1212 : : TimestampTz last_start;
1213 : : TimestampTz now;
1214 : : long elapsed;
1215 : :
97 akapila@postgresql.o 1216 [ + + ]:GNC 817 : if (sub->retaindeadtuples)
1217 : : {
1218 : 7 : retain_dead_tuples = true;
1219 : :
1220 : : /*
1221 : : * Create a replication slot to retain information necessary
1222 : : * for conflict detection such as dead tuples, commit
1223 : : * timestamps, and origins.
1224 : : *
1225 : : * The slot is created before starting the apply worker to
1226 : : * prevent it from unnecessarily maintaining its
1227 : : * oldest_nonremovable_xid.
1228 : : *
1229 : : * The slot is created even for a disabled subscription to
1230 : : * ensure that conflict-related information is available when
1231 : : * applying remote changes that occurred before the
1232 : : * subscription was enabled.
1233 : : */
1234 : 7 : CreateConflictDetectionSlot();
1235 : :
56 1236 [ + - ]: 7 : if (sub->retentionactive)
1237 : : {
1238 : : /*
1239 : : * Can't advance xmin of the slot unless all the
1240 : : * subscriptions actively retaining dead tuples are
1241 : : * enabled. This is required to ensure that we don't
1242 : : * advance the xmin of CONFLICT_DETECTION_SLOT if one of
1243 : : * the subscriptions is not enabled. Otherwise, we won't
1244 : : * be able to detect conflicts reliably for such a
1245 : : * subscription even though it has set the
1246 : : * retain_dead_tuples option.
1247 : : */
1248 : 7 : can_update_xmin &= sub->enabled;
1249 : :
1250 : : /*
1251 : : * Initialize the slot once the subscription activiates
1252 : : * retention.
1253 : : */
1254 [ - + ]: 7 : if (!TransactionIdIsValid(MyReplicationSlot->data.xmin))
56 akapila@postgresql.o 1255 :UNC 0 : init_conflict_slot_xmin();
1256 : : }
1257 : : }
1258 : :
1010 tgl@sss.pgh.pa.us 1259 [ + + ]:CBC 817 : if (!sub->enabled)
1260 : 39 : continue;
1261 : :
1262 : 778 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
1263 : 778 : w = logicalrep_worker_find(sub->oid, InvalidOid, false);
1264 : :
1265 [ + + ]: 778 : if (w != NULL)
1266 : : {
1267 : : /*
1268 : : * Compute the minimum xmin required to protect dead tuples
1269 : : * required for conflict detection among all running apply
1270 : : * workers. This computation is performed while holding
1271 : : * LogicalRepWorkerLock to prevent accessing invalid worker
1272 : : * data, in scenarios where a worker might exit and reset its
1273 : : * state concurrently.
1274 : : */
56 akapila@postgresql.o 1275 [ + + ]:GNC 488 : if (sub->retaindeadtuples &&
1276 [ + - + - ]: 4 : sub->retentionactive &&
1277 : : can_update_xmin)
97 1278 : 4 : compute_min_nonremovable_xid(w, &xmin);
1279 : :
43 1280 : 488 : LWLockRelease(LogicalRepWorkerLock);
1281 : :
1282 : : /* worker is running already */
97 1283 : 488 : continue;
1284 : : }
1285 : :
43 1286 : 290 : LWLockRelease(LogicalRepWorkerLock);
1287 : :
1288 : : /*
1289 : : * Can't advance xmin of the slot unless all the workers
1290 : : * corresponding to subscriptions actively retaining dead tuples
1291 : : * are running, disabling the further computation of the minimum
1292 : : * nonremovable xid.
1293 : : */
56 1294 [ + + + - ]: 290 : if (sub->retaindeadtuples && sub->retentionactive)
1295 : 1 : can_update_xmin = false;
1296 : :
1297 : : /*
1298 : : * If the worker is eligible to start now, launch it. Otherwise,
1299 : : * adjust wait_time so that we'll wake up as soon as it can be
1300 : : * started.
1301 : : *
1302 : : * Each subscription's apply worker can only be restarted once per
1303 : : * wal_retrieve_retry_interval, so that errors do not cause us to
1304 : : * repeatedly restart the worker as fast as possible. In cases
1305 : : * where a restart is expected (e.g., subscription parameter
1306 : : * changes), another process should remove the last-start entry
1307 : : * for the subscription so that the worker can be restarted
1308 : : * without waiting for wal_retrieve_retry_interval to elapse.
1309 : : */
1010 tgl@sss.pgh.pa.us 1310 :CBC 290 : last_start = ApplyLauncherGetWorkerStartTime(sub->oid);
1311 : 290 : now = GetCurrentTimestamp();
1312 [ + + ]: 290 : if (last_start == 0 ||
1313 [ + + ]: 166 : (elapsed = TimestampDifferenceMilliseconds(last_start, now)) >= wal_retrieve_retry_interval)
1314 : : {
1315 : 199 : ApplyLauncherSetWorkerStartTime(sub->oid, now);
126 1316 [ + + ]: 199 : if (!logicalrep_worker_launch(WORKERTYPE_APPLY,
1317 : 199 : sub->dbid, sub->oid, sub->name,
1318 : : sub->owner, InvalidOid,
1319 : : DSM_HANDLE_INVALID,
56 akapila@postgresql.o 1320 [ + + ]:GNC 200 : sub->retaindeadtuples &&
1321 [ + - ]: 1 : sub->retentionactive))
1322 : : {
1323 : : /*
1324 : : * We get here either if we failed to launch a worker
1325 : : * (perhaps for resource-exhaustion reasons) or if we
1326 : : * launched one but it immediately quit. Either way, it
1327 : : * seems appropriate to try again after
1328 : : * wal_retrieve_retry_interval.
1329 : : */
126 tgl@sss.pgh.pa.us 1330 :CBC 1 : wait_time = Min(wait_time,
1331 : : wal_retrieve_retry_interval);
1332 : : }
1333 : : }
1334 : : else
1335 : : {
1010 1336 : 91 : wait_time = Min(wait_time,
1337 : : wal_retrieve_retry_interval - elapsed);
1338 : : }
1339 : : }
1340 : :
1341 : : /*
1342 : : * Drop the CONFLICT_DETECTION_SLOT slot if there is no subscription
1343 : : * that requires us to retain dead tuples. Otherwise, if required,
1344 : : * advance the slot's xmin to protect dead tuples required for the
1345 : : * conflict detection.
1346 : : *
1347 : : * Additionally, if all apply workers for subscriptions with
1348 : : * retain_dead_tuples enabled have requested to stop retention, the
1349 : : * slot's xmin will be set to InvalidTransactionId allowing the
1350 : : * removal of dead tuples.
1351 : : */
97 akapila@postgresql.o 1352 [ + + ]:GNC 2715 : if (MyReplicationSlot)
1353 : : {
1354 [ + + ]: 8 : if (!retain_dead_tuples)
1355 : 1 : ReplicationSlotDropAcquired();
56 1356 [ + + ]: 7 : else if (can_update_xmin)
1357 : 4 : update_conflict_slot_xmin(xmin);
1358 : : }
1359 : :
1360 : : /* Switch back to original memory context. */
1010 tgl@sss.pgh.pa.us 1361 :CBC 2715 : MemoryContextSwitchTo(oldctx);
1362 : : /* Clean the temporary memory. */
1363 : 2715 : MemoryContextDelete(subctx);
1364 : :
1365 : : /* Wait for more work. */
3066 andres@anarazel.de 1366 : 2715 : rc = WaitLatch(MyLatch,
1367 : : WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
1368 : : wait_time,
1369 : : WAIT_EVENT_LOGICAL_LAUNCHER_MAIN);
1370 : :
1371 [ + + ]: 2712 : if (rc & WL_LATCH_SET)
1372 : : {
1373 : 2698 : ResetLatch(MyLatch);
1374 [ + + ]: 2698 : CHECK_FOR_INTERRUPTS();
1375 : : }
1376 : :
2142 rhaas@postgresql.org 1377 [ + + ]: 2312 : if (ConfigReloadPending)
1378 : : {
1379 : 39 : ConfigReloadPending = false;
3123 peter_e@gmx.net 1380 : 39 : ProcessConfigFile(PGC_SIGHUP);
1381 : : }
1382 : : }
1383 : :
1384 : : /* Not reachable */
1385 : : }
1386 : :
1387 : : /*
1388 : : * Determine the minimum non-removable transaction ID across all apply workers
1389 : : * for subscriptions that have retain_dead_tuples enabled. Store the result
1390 : : * in *xmin.
1391 : : */
1392 : : static void
97 akapila@postgresql.o 1393 :GNC 4 : compute_min_nonremovable_xid(LogicalRepWorker *worker, TransactionId *xmin)
1394 : : {
1395 : : TransactionId nonremovable_xid;
1396 : :
1397 [ - + ]: 4 : Assert(worker != NULL);
1398 : :
1399 : : /*
1400 : : * The replication slot for conflict detection must be created before the
1401 : : * worker starts.
1402 : : */
1403 [ - + ]: 4 : Assert(MyReplicationSlot);
1404 : :
1405 [ - + ]: 4 : SpinLockAcquire(&worker->relmutex);
1406 : 4 : nonremovable_xid = worker->oldest_nonremovable_xid;
1407 : 4 : SpinLockRelease(&worker->relmutex);
1408 : :
1409 : : /*
1410 : : * Return if the apply worker has stopped retention concurrently.
1411 : : *
1412 : : * Although this function is invoked only when retentionactive is true,
1413 : : * the apply worker might stop retention after the launcher fetches the
1414 : : * retentionactive flag.
1415 : : */
56 1416 [ - + ]: 4 : if (!TransactionIdIsValid(nonremovable_xid))
56 akapila@postgresql.o 1417 :UNC 0 : return;
1418 : :
97 akapila@postgresql.o 1419 [ - + - - ]:GNC 4 : if (!TransactionIdIsValid(*xmin) ||
97 akapila@postgresql.o 1420 :UNC 0 : TransactionIdPrecedes(nonremovable_xid, *xmin))
97 akapila@postgresql.o 1421 :GNC 4 : *xmin = nonremovable_xid;
1422 : : }
1423 : :
1424 : : /*
1425 : : * Acquire the replication slot used to retain information for conflict
1426 : : * detection, if it exists.
1427 : : *
1428 : : * Return true if successfully acquired, otherwise return false.
1429 : : */
1430 : : static bool
1431 : 405 : acquire_conflict_slot_if_exists(void)
1432 : : {
1433 [ + + ]: 405 : if (!SearchNamedReplicationSlot(CONFLICT_DETECTION_SLOT, true))
1434 : 404 : return false;
1435 : :
1436 : 1 : ReplicationSlotAcquire(CONFLICT_DETECTION_SLOT, true, false);
1437 : 1 : return true;
1438 : : }
1439 : :
1440 : : /*
1441 : : * Update the xmin the replication slot used to retain information required
1442 : : * for conflict detection.
1443 : : */
1444 : : static void
56 1445 : 4 : update_conflict_slot_xmin(TransactionId new_xmin)
1446 : : {
97 1447 [ - + ]: 4 : Assert(MyReplicationSlot);
56 1448 [ + - - + ]: 4 : Assert(!TransactionIdIsValid(new_xmin) ||
1449 : : TransactionIdPrecedesOrEquals(MyReplicationSlot->data.xmin, new_xmin));
1450 : :
1451 : : /* Return if the xmin value of the slot cannot be updated */
97 1452 [ + + ]: 4 : if (TransactionIdEquals(MyReplicationSlot->data.xmin, new_xmin))
1453 : 2 : return;
1454 : :
1455 [ - + ]: 2 : SpinLockAcquire(&MyReplicationSlot->mutex);
1456 : 2 : MyReplicationSlot->effective_xmin = new_xmin;
1457 : 2 : MyReplicationSlot->data.xmin = new_xmin;
1458 : 2 : SpinLockRelease(&MyReplicationSlot->mutex);
1459 : :
1460 [ - + ]: 2 : elog(DEBUG1, "updated xmin: %u", MyReplicationSlot->data.xmin);
1461 : :
1462 : 2 : ReplicationSlotMarkDirty();
1463 : 2 : ReplicationSlotsComputeRequiredXmin(false);
1464 : :
1465 : : /*
1466 : : * Like PhysicalConfirmReceivedLocation(), do not save slot information
1467 : : * each time. This is acceptable because all concurrent transactions on
1468 : : * the publisher that require the data preceding the slot's xmin should
1469 : : * have already been applied and flushed on the subscriber before the xmin
1470 : : * is advanced. So, even if the slot's xmin regresses after a restart, it
1471 : : * will be advanced again in the next cycle. Therefore, no data required
1472 : : * for conflict detection will be prematurely removed.
1473 : : */
1474 : 2 : return;
1475 : : }
1476 : :
1477 : : /*
1478 : : * Initialize the xmin for the conflict detection slot.
1479 : : */
1480 : : static void
56 1481 : 3 : init_conflict_slot_xmin(void)
1482 : : {
1483 : : TransactionId xmin_horizon;
1484 : :
1485 : : /* Replication slot must exist but shouldn't be initialized. */
1486 [ + - - + ]: 3 : Assert(MyReplicationSlot &&
1487 : : !TransactionIdIsValid(MyReplicationSlot->data.xmin));
1488 : :
97 1489 : 3 : LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
1490 : :
1491 : 3 : xmin_horizon = GetOldestSafeDecodingTransactionId(false);
1492 : :
1493 [ - + ]: 3 : SpinLockAcquire(&MyReplicationSlot->mutex);
1494 : 3 : MyReplicationSlot->effective_xmin = xmin_horizon;
1495 : 3 : MyReplicationSlot->data.xmin = xmin_horizon;
1496 : 3 : SpinLockRelease(&MyReplicationSlot->mutex);
1497 : :
1498 : 3 : ReplicationSlotsComputeRequiredXmin(true);
1499 : :
1500 : 3 : LWLockRelease(ProcArrayLock);
1501 : :
1502 : : /* Write this slot to disk */
1503 : 3 : ReplicationSlotMarkDirty();
1504 : 3 : ReplicationSlotSave();
1505 : 3 : }
1506 : :
1507 : : /*
1508 : : * Create and acquire the replication slot used to retain information for
1509 : : * conflict detection, if not yet.
1510 : : */
1511 : : void
56 1512 : 8 : CreateConflictDetectionSlot(void)
1513 : : {
1514 : : /* Exit early, if the replication slot is already created and acquired */
1515 [ + + ]: 8 : if (MyReplicationSlot)
1516 : 5 : return;
1517 : :
1518 [ + - ]: 3 : ereport(LOG,
1519 : : errmsg("creating replication conflict detection slot"));
1520 : :
1521 : 3 : ReplicationSlotCreate(CONFLICT_DETECTION_SLOT, false, RS_PERSISTENT, false,
1522 : : false, false);
1523 : :
1524 : 3 : init_conflict_slot_xmin();
1525 : : }
1526 : :
1527 : : /*
1528 : : * Is current process the logical replication launcher?
1529 : : */
1530 : : bool
3064 andres@anarazel.de 1531 :CBC 2344 : IsLogicalLauncher(void)
1532 : : {
1533 : 2344 : return LogicalRepCtx->launcher_pid == MyProcPid;
1534 : : }
1535 : :
1536 : : /*
1537 : : * Return the pid of the leader apply worker if the given pid is the pid of a
1538 : : * parallel apply worker, otherwise, return InvalidPid.
1539 : : */
1540 : : pid_t
1014 akapila@postgresql.o 1541 : 851 : GetLeaderApplyWorkerPid(pid_t pid)
1542 : : {
1543 : 851 : int leader_pid = InvalidPid;
1544 : : int i;
1545 : :
1546 : 851 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
1547 : :
1548 [ + + ]: 4255 : for (i = 0; i < max_logical_replication_workers; i++)
1549 : : {
1550 : 3404 : LogicalRepWorker *w = &LogicalRepCtx->workers[i];
1551 : :
1552 [ + + - + : 3404 : if (isParallelApplyWorker(w) && w->proc && pid == w->proc->pid)
- - - - ]
1553 : : {
1014 akapila@postgresql.o 1554 :UBC 0 : leader_pid = w->leader_pid;
1555 : 0 : break;
1556 : : }
1557 : : }
1558 : :
1014 akapila@postgresql.o 1559 :CBC 851 : LWLockRelease(LogicalRepWorkerLock);
1560 : :
1561 : 851 : return leader_pid;
1562 : : }
1563 : :
1564 : : /*
1565 : : * Returns state of the subscriptions.
1566 : : */
1567 : : Datum
3204 peter_e@gmx.net 1568 : 1 : pg_stat_get_subscription(PG_FUNCTION_ARGS)
1569 : : {
1570 : : #define PG_STAT_GET_SUBSCRIPTION_COLS 10
1571 [ - + ]: 1 : Oid subid = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0);
1572 : : int i;
1573 : 1 : ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1574 : :
1106 michael@paquier.xyz 1575 : 1 : InitMaterializedSRF(fcinfo, 0);
1576 : :
1577 : : /* Make sure we get consistent view of the workers. */
3204 peter_e@gmx.net 1578 : 1 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
1579 : :
1239 tgl@sss.pgh.pa.us 1580 [ + + ]: 5 : for (i = 0; i < max_logical_replication_workers; i++)
1581 : : {
1582 : : /* for each row */
1200 peter@eisentraut.org 1583 : 4 : Datum values[PG_STAT_GET_SUBSCRIPTION_COLS] = {0};
1584 : 4 : bool nulls[PG_STAT_GET_SUBSCRIPTION_COLS] = {0};
1585 : : int worker_pid;
1586 : : LogicalRepWorker worker;
1587 : :
3204 peter_e@gmx.net 1588 : 4 : memcpy(&worker, &LogicalRepCtx->workers[i],
1589 : : sizeof(LogicalRepWorker));
1590 [ + + - + ]: 4 : if (!worker.proc || !IsBackendPid(worker.proc->pid))
1591 : 2 : continue;
1592 : :
1593 [ - + - - ]: 2 : if (OidIsValid(subid) && worker.subid != subid)
3204 peter_e@gmx.net 1594 :UBC 0 : continue;
1595 : :
3204 peter_e@gmx.net 1596 :CBC 2 : worker_pid = worker.proc->pid;
1597 : :
1598 : 2 : values[0] = ObjectIdGetDatum(worker.subid);
806 akapila@postgresql.o 1599 [ + - - + ]: 2 : if (isTablesyncWorker(&worker))
3141 peter_e@gmx.net 1600 :UBC 0 : values[1] = ObjectIdGetDatum(worker.relid);
1601 : : else
3141 peter_e@gmx.net 1602 :CBC 2 : nulls[1] = true;
1603 : 2 : values[2] = Int32GetDatum(worker_pid);
1604 : :
1014 akapila@postgresql.o 1605 [ + - - + ]: 2 : if (isParallelApplyWorker(&worker))
1014 akapila@postgresql.o 1606 :UBC 0 : values[3] = Int32GetDatum(worker.leader_pid);
1607 : : else
3141 peter_e@gmx.net 1608 :CBC 2 : nulls[3] = true;
1609 : :
1014 akapila@postgresql.o 1610 [ + + ]: 2 : if (XLogRecPtrIsInvalid(worker.last_lsn))
1014 akapila@postgresql.o 1611 :GBC 1 : nulls[4] = true;
1612 : : else
1014 akapila@postgresql.o 1613 :CBC 1 : values[4] = LSNGetDatum(worker.last_lsn);
3204 peter_e@gmx.net 1614 [ - + ]: 2 : if (worker.last_send_time == 0)
1014 akapila@postgresql.o 1615 :UBC 0 : nulls[5] = true;
1616 : : else
1014 akapila@postgresql.o 1617 :CBC 2 : values[5] = TimestampTzGetDatum(worker.last_send_time);
3204 peter_e@gmx.net 1618 [ - + ]: 2 : if (worker.last_recv_time == 0)
1014 akapila@postgresql.o 1619 :UBC 0 : nulls[6] = true;
1620 : : else
1014 akapila@postgresql.o 1621 :CBC 2 : values[6] = TimestampTzGetDatum(worker.last_recv_time);
3204 peter_e@gmx.net 1622 [ + + ]: 2 : if (XLogRecPtrIsInvalid(worker.reply_lsn))
1014 akapila@postgresql.o 1623 :GBC 1 : nulls[7] = true;
1624 : : else
1014 akapila@postgresql.o 1625 :CBC 1 : values[7] = LSNGetDatum(worker.reply_lsn);
3204 peter_e@gmx.net 1626 [ - + ]: 2 : if (worker.reply_time == 0)
1014 akapila@postgresql.o 1627 :UBC 0 : nulls[8] = true;
1628 : : else
1014 akapila@postgresql.o 1629 :CBC 2 : values[8] = TimestampTzGetDatum(worker.reply_time);
1630 : :
764 nathan@postgresql.or 1631 [ + - - - : 2 : switch (worker.type)
- ]
1632 : : {
1633 : 2 : case WORKERTYPE_APPLY:
1634 : 2 : values[9] = CStringGetTextDatum("apply");
1635 : 2 : break;
764 nathan@postgresql.or 1636 :UBC 0 : case WORKERTYPE_PARALLEL_APPLY:
1637 : 0 : values[9] = CStringGetTextDatum("parallel apply");
1638 : 0 : break;
1639 : 0 : case WORKERTYPE_TABLESYNC:
1640 : 0 : values[9] = CStringGetTextDatum("table synchronization");
1641 : 0 : break;
1642 : 0 : case WORKERTYPE_UNKNOWN:
1643 : : /* Should never happen. */
1644 [ # # ]: 0 : elog(ERROR, "unknown worker type");
1645 : : }
1646 : :
1331 michael@paquier.xyz 1647 :CBC 2 : tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
1648 : : values, nulls);
1649 : :
1650 : : /*
1651 : : * If only a single subscription was requested, and we found it,
1652 : : * break.
1653 : : */
3204 peter_e@gmx.net 1654 [ - + ]: 2 : if (OidIsValid(subid))
3204 peter_e@gmx.net 1655 :UBC 0 : break;
1656 : : }
1657 : :
3204 peter_e@gmx.net 1658 :CBC 1 : LWLockRelease(LogicalRepWorkerLock);
1659 : :
1660 : 1 : return (Datum) 0;
1661 : : }
|