Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : * launcher.c
3 : : * PostgreSQL logical replication worker launcher process
4 : : *
5 : : * Copyright (c) 2016-2025, PostgreSQL Global Development Group
6 : : *
7 : : * IDENTIFICATION
8 : : * src/backend/replication/logical/launcher.c
9 : : *
10 : : * NOTES
11 : : * This module contains the logical replication worker launcher which
12 : : * uses the background worker infrastructure to start the logical
13 : : * replication workers for every enabled subscription.
14 : : *
15 : : *-------------------------------------------------------------------------
16 : : */
17 : :
18 : : #include "postgres.h"
19 : :
20 : : #include "access/heapam.h"
21 : : #include "access/htup.h"
22 : : #include "access/htup_details.h"
23 : : #include "access/tableam.h"
24 : : #include "access/xact.h"
25 : : #include "catalog/pg_subscription.h"
26 : : #include "catalog/pg_subscription_rel.h"
27 : : #include "funcapi.h"
28 : : #include "lib/dshash.h"
29 : : #include "miscadmin.h"
30 : : #include "pgstat.h"
31 : : #include "postmaster/bgworker.h"
32 : : #include "postmaster/interrupt.h"
33 : : #include "replication/logicallauncher.h"
34 : : #include "replication/origin.h"
35 : : #include "replication/slot.h"
36 : : #include "replication/walreceiver.h"
37 : : #include "replication/worker_internal.h"
38 : : #include "storage/ipc.h"
39 : : #include "storage/proc.h"
40 : : #include "storage/procarray.h"
41 : : #include "tcop/tcopprot.h"
42 : : #include "utils/builtins.h"
43 : : #include "utils/memutils.h"
44 : : #include "utils/pg_lsn.h"
45 : : #include "utils/snapmgr.h"
46 : : #include "utils/syscache.h"
47 : :
48 : : /* max sleep time between cycles (3min) */
49 : : #define DEFAULT_NAPTIME_PER_CYCLE 180000L
50 : :
51 : : /* GUC variables */
52 : : int max_logical_replication_workers = 4;
53 : : int max_sync_workers_per_subscription = 2;
54 : : int max_parallel_apply_workers_per_subscription = 2;
55 : :
56 : : LogicalRepWorker *MyLogicalRepWorker = NULL;
57 : :
58 : : typedef struct LogicalRepCtxStruct
59 : : {
60 : : /* Supervisor process. */
61 : : pid_t launcher_pid;
62 : :
63 : : /* Hash table holding last start times of subscriptions' apply workers. */
64 : : dsa_handle last_start_dsa;
65 : : dshash_table_handle last_start_dsh;
66 : :
67 : : /* Background workers. */
68 : : LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER];
69 : : } LogicalRepCtxStruct;
70 : :
71 : : static LogicalRepCtxStruct *LogicalRepCtx;
72 : :
73 : : /* an entry in the last-start-times shared hash table */
74 : : typedef struct LauncherLastStartTimesEntry
75 : : {
76 : : Oid subid; /* OID of logrep subscription (hash key) */
77 : : TimestampTz last_start_time; /* last time its apply worker was started */
78 : : } LauncherLastStartTimesEntry;
79 : :
80 : : /* parameters for the last-start-times shared hash table */
81 : : static const dshash_parameters dsh_params = {
82 : : sizeof(Oid),
83 : : sizeof(LauncherLastStartTimesEntry),
84 : : dshash_memcmp,
85 : : dshash_memhash,
86 : : dshash_memcpy,
87 : : LWTRANCHE_LAUNCHER_HASH
88 : : };
89 : :
90 : : static dsa_area *last_start_times_dsa = NULL;
91 : : static dshash_table *last_start_times = NULL;
92 : :
93 : : static bool on_commit_launcher_wakeup = false;
94 : :
95 : :
96 : : static void logicalrep_launcher_onexit(int code, Datum arg);
97 : : static void logicalrep_worker_onexit(int code, Datum arg);
98 : : static void logicalrep_worker_detach(void);
99 : : static void logicalrep_worker_cleanup(LogicalRepWorker *worker);
100 : : static int logicalrep_pa_worker_count(Oid subid);
101 : : static void logicalrep_launcher_attach_dshmem(void);
102 : : static void ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time);
103 : : static TimestampTz ApplyLauncherGetWorkerStartTime(Oid subid);
104 : : static void compute_min_nonremovable_xid(LogicalRepWorker *worker, TransactionId *xmin);
105 : : static bool acquire_conflict_slot_if_exists(void);
106 : : static void update_conflict_slot_xmin(TransactionId new_xmin);
107 : : static void init_conflict_slot_xmin(void);
108 : :
109 : :
110 : : /*
111 : : * Load the list of subscriptions.
112 : : *
113 : : * Only the fields interesting for worker start/stop functions are filled for
114 : : * each subscription.
115 : : */
116 : : static List *
3152 peter_e@gmx.net 117 :CBC 2669 : get_subscription_list(void)
118 : : {
119 : 2669 : List *res = NIL;
120 : : Relation rel;
121 : : TableScanDesc scan;
122 : : HeapTuple tup;
123 : : MemoryContext resultcxt;
124 : :
125 : : /* This is the context that we will allocate our output data in */
126 : 2669 : resultcxt = CurrentMemoryContext;
127 : :
128 : : /*
129 : : * Start a transaction so we can access pg_subscription.
130 : : */
131 : 2669 : StartTransactionCommand();
132 : :
2420 andres@anarazel.de 133 : 2669 : rel = table_open(SubscriptionRelationId, AccessShareLock);
2371 134 : 2669 : scan = table_beginscan_catalog(rel, 0, NULL);
135 : :
3152 peter_e@gmx.net 136 [ + + ]: 3491 : while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
137 : : {
138 : 822 : Form_pg_subscription subform = (Form_pg_subscription) GETSTRUCT(tup);
139 : : Subscription *sub;
140 : : MemoryContext oldcxt;
141 : :
142 : : /*
143 : : * Allocate our results in the caller's context, not the
144 : : * transaction's. We do this inside the loop, and restore the original
145 : : * context at the end, so that leaky things like heap_getnext() are
146 : : * not called in a potentially long-lived context.
147 : : */
148 : 822 : oldcxt = MemoryContextSwitchTo(resultcxt);
149 : :
3067 150 : 822 : sub = (Subscription *) palloc0(sizeof(Subscription));
2482 andres@anarazel.de 151 : 822 : sub->oid = subform->oid;
3152 peter_e@gmx.net 152 : 822 : sub->dbid = subform->subdbid;
153 : 822 : sub->owner = subform->subowner;
154 : 822 : sub->enabled = subform->subenabled;
155 : 822 : sub->name = pstrdup(NameStr(subform->subname));
45 akapila@postgresql.o 156 :GNC 822 : sub->retaindeadtuples = subform->subretaindeadtuples;
4 157 : 822 : sub->retentionactive = subform->subretentionactive;
158 : : /* We don't fill fields we are not interested in. */
159 : :
3152 peter_e@gmx.net 160 :CBC 822 : res = lappend(res, sub);
161 : 822 : MemoryContextSwitchTo(oldcxt);
162 : : }
163 : :
2371 andres@anarazel.de 164 : 2669 : table_endscan(scan);
2420 165 : 2669 : table_close(rel, AccessShareLock);
166 : :
3152 peter_e@gmx.net 167 : 2669 : CommitTransactionCommand();
168 : :
169 : 2669 : return res;
170 : : }
171 : :
172 : : /*
173 : : * Wait for a background worker to start up and attach to the shmem context.
174 : : *
175 : : * This is only needed for cleaning up the shared memory in case the worker
176 : : * fails to attach.
177 : : *
178 : : * Returns whether the attach was successful.
179 : : */
180 : : static bool
181 : 410 : WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
182 : : uint16 generation,
183 : : BackgroundWorkerHandle *handle)
184 : : {
74 tgl@sss.pgh.pa.us 185 : 410 : bool result = false;
186 : 410 : bool dropped_latch = false;
187 : :
188 : : for (;;)
3152 peter_e@gmx.net 189 : 1113 : {
190 : : BgwHandleStatus status;
191 : : pid_t pid;
192 : : int rc;
193 : :
194 [ + + ]: 1523 : CHECK_FOR_INTERRUPTS();
195 : :
3055 196 : 1522 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
197 : :
198 : : /* Worker either died or has started. Return false if died. */
199 [ + - + + ]: 1522 : if (!worker->in_use || worker->proc)
200 : : {
74 tgl@sss.pgh.pa.us 201 : 405 : result = worker->in_use;
3055 peter_e@gmx.net 202 : 405 : LWLockRelease(LogicalRepWorkerLock);
74 tgl@sss.pgh.pa.us 203 : 405 : break;
204 : : }
205 : :
3055 peter_e@gmx.net 206 : 1117 : LWLockRelease(LogicalRepWorkerLock);
207 : :
208 : : /* Check if worker has died before attaching, and clean up after it. */
3152 209 : 1117 : status = GetBackgroundWorkerPid(handle, &pid);
210 : :
211 [ - + ]: 1117 : if (status == BGWH_STOPPED)
212 : : {
3055 peter_e@gmx.net 213 :UBC 0 : LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
214 : : /* Ensure that this was indeed the worker we waited for. */
215 [ # # ]: 0 : if (generation == worker->generation)
216 : 0 : logicalrep_worker_cleanup(worker);
217 : 0 : LWLockRelease(LogicalRepWorkerLock);
74 tgl@sss.pgh.pa.us 218 : 0 : break; /* result is already false */
219 : : }
220 : :
221 : : /*
222 : : * We need timeout because we generally don't get notified via latch
223 : : * about the worker attach. But we don't expect to have to wait long.
224 : : */
3152 peter_e@gmx.net 225 :CBC 1117 : rc = WaitLatch(MyLatch,
226 : : WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
227 : : 10L, WAIT_EVENT_BGWORKER_STARTUP);
228 : :
3014 andres@anarazel.de 229 [ + + ]: 1117 : if (rc & WL_LATCH_SET)
230 : : {
231 : 467 : ResetLatch(MyLatch);
232 [ + + ]: 467 : CHECK_FOR_INTERRUPTS();
74 tgl@sss.pgh.pa.us 233 : 463 : dropped_latch = true;
234 : : }
235 : : }
236 : :
237 : : /*
238 : : * If we had to clear a latch event in order to wait, be sure to restore
239 : : * it before exiting. Otherwise caller may miss events.
240 : : */
241 [ + - ]: 405 : if (dropped_latch)
242 : 405 : SetLatch(MyLatch);
243 : :
244 : 405 : return result;
245 : : }
246 : :
247 : : /*
248 : : * Walks the workers array and searches for one that matches given
249 : : * subscription id and relid.
250 : : *
251 : : * We are only interested in the leader apply worker or table sync worker.
252 : : */
253 : : LogicalRepWorker *
3089 peter_e@gmx.net 254 : 3120 : logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
255 : : {
256 : : int i;
3034 bruce@momjian.us 257 : 3120 : LogicalRepWorker *res = NULL;
258 : :
3152 peter_e@gmx.net 259 [ - + ]: 3120 : Assert(LWLockHeldByMe(LogicalRepWorkerLock));
260 : :
261 : : /* Search for attached worker for a given subscription id. */
262 [ + + ]: 9624 : for (i = 0; i < max_logical_replication_workers; i++)
263 : : {
3034 bruce@momjian.us 264 : 8402 : LogicalRepWorker *w = &LogicalRepCtx->workers[i];
265 : :
266 : : /* Skip parallel apply workers. */
971 akapila@postgresql.o 267 [ + + - + ]: 8402 : if (isParallelApplyWorker(w))
971 akapila@postgresql.o 268 :UBC 0 : continue;
269 : :
3055 peter_e@gmx.net 270 [ + + + + :CBC 8402 : if (w->in_use && w->subid == subid && w->relid == relid &&
+ + ]
271 [ + + + - ]: 1898 : (!only_running || w->proc))
272 : : {
3152 273 : 1898 : res = w;
274 : 1898 : break;
275 : : }
276 : : }
277 : :
278 : 3120 : return res;
279 : : }
280 : :
281 : : /*
282 : : * Similar to logicalrep_worker_find(), but returns a list of all workers for
283 : : * the subscription, instead of just one.
284 : : */
285 : : List *
409 akapila@postgresql.o 286 : 647 : logicalrep_workers_find(Oid subid, bool only_running, bool acquire_lock)
287 : : {
288 : : int i;
2955 peter_e@gmx.net 289 : 647 : List *res = NIL;
290 : :
409 akapila@postgresql.o 291 [ + + ]: 647 : if (acquire_lock)
292 : 117 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
293 : :
2955 peter_e@gmx.net 294 [ - + ]: 647 : Assert(LWLockHeldByMe(LogicalRepWorkerLock));
295 : :
296 : : /* Search for attached worker for a given subscription id. */
297 [ + + ]: 3363 : for (i = 0; i < max_logical_replication_workers; i++)
298 : : {
299 : 2716 : LogicalRepWorker *w = &LogicalRepCtx->workers[i];
300 : :
301 [ + + + + : 2716 : if (w->in_use && w->subid == subid && (!only_running || w->proc))
+ + + + ]
302 : 462 : res = lappend(res, w);
303 : : }
304 : :
409 akapila@postgresql.o 305 [ + + ]: 647 : if (acquire_lock)
306 : 117 : LWLockRelease(LogicalRepWorkerLock);
307 : :
2955 peter_e@gmx.net 308 : 647 : return res;
309 : : }
310 : :
311 : : /*
312 : : * Start new logical replication background worker, if possible.
313 : : *
314 : : * Returns true on success, false on failure.
315 : : */
316 : : bool
754 akapila@postgresql.o 317 : 410 : logicalrep_worker_launch(LogicalRepWorkerType wtype,
318 : : Oid dbid, Oid subid, const char *subname, Oid userid,
319 : : Oid relid, dsm_handle subworker_dsm,
320 : : bool retain_dead_tuples)
321 : : {
322 : : BackgroundWorker bgw;
323 : : BackgroundWorkerHandle *bgw_handle;
324 : : uint16 generation;
325 : : int i;
3034 bruce@momjian.us 326 : 410 : int slot = 0;
327 : 410 : LogicalRepWorker *worker = NULL;
328 : : int nsyncworkers;
329 : : int nparallelapplyworkers;
330 : : TimestampTz now;
754 akapila@postgresql.o 331 : 410 : bool is_tablesync_worker = (wtype == WORKERTYPE_TABLESYNC);
332 : 410 : bool is_parallel_apply_worker = (wtype == WORKERTYPE_PARALLEL_APPLY);
333 : :
334 : : /*----------
335 : : * Sanity checks:
336 : : * - must be valid worker type
337 : : * - tablesync workers are only ones to have relid
338 : : * - parallel apply worker is the only kind of subworker
339 : : * - The replication slot used in conflict detection is created when
340 : : * retain_dead_tuples is enabled
341 : : */
342 [ - + ]: 410 : Assert(wtype != WORKERTYPE_UNKNOWN);
343 [ - + ]: 410 : Assert(is_tablesync_worker == OidIsValid(relid));
344 [ - + ]: 410 : Assert(is_parallel_apply_worker == (subworker_dsm != DSM_HANDLE_INVALID));
45 akapila@postgresql.o 345 [ + + - + ]:GNC 410 : Assert(!retain_dead_tuples || MyReplicationSlot);
346 : :
3027 peter_e@gmx.net 347 [ + + ]:CBC 410 : ereport(DEBUG1,
348 : : (errmsg_internal("starting logical replication worker for subscription \"%s\"",
349 : : subname)));
350 : :
351 : : /* Report this after the initial starting message for consistency. */
169 msawada@postgresql.o 352 [ - + ]: 410 : if (max_active_replication_origins == 0)
3152 peter_e@gmx.net 353 [ # # ]:UBC 0 : ereport(ERROR,
354 : : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
355 : : errmsg("cannot start logical replication workers when \"max_active_replication_origins\" is 0")));
356 : :
357 : : /*
358 : : * We need to do the modification of the shared memory under lock so that
359 : : * we have consistent view.
360 : : */
3152 peter_e@gmx.net 361 :CBC 410 : LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
362 : :
3055 363 : 410 : retry:
364 : : /* Find unused worker slot. */
365 [ + - ]: 707 : for (i = 0; i < max_logical_replication_workers; i++)
366 : : {
367 : 707 : LogicalRepWorker *w = &LogicalRepCtx->workers[i];
368 : :
369 [ + + ]: 707 : if (!w->in_use)
370 : : {
371 : 410 : worker = w;
372 : 410 : slot = i;
3152 373 : 410 : break;
374 : : }
375 : : }
376 : :
3055 377 : 410 : nsyncworkers = logicalrep_sync_worker_count(subid);
378 : :
379 : 410 : now = GetCurrentTimestamp();
380 : :
381 : : /*
382 : : * If we didn't find a free slot, try to do garbage collection. The
383 : : * reason we do this is because if some worker failed to start up and its
384 : : * parent has crashed while waiting, the in_use state was never cleared.
385 : : */
386 [ + - - + ]: 410 : if (worker == NULL || nsyncworkers >= max_sync_workers_per_subscription)
387 : : {
3034 bruce@momjian.us 388 :UBC 0 : bool did_cleanup = false;
389 : :
3055 peter_e@gmx.net 390 [ # # ]: 0 : for (i = 0; i < max_logical_replication_workers; i++)
391 : : {
392 : 0 : LogicalRepWorker *w = &LogicalRepCtx->workers[i];
393 : :
394 : : /*
395 : : * If the worker was marked in use but didn't manage to attach in
396 : : * time, clean it up.
397 : : */
398 [ # # # # : 0 : if (w->in_use && !w->proc &&
# # ]
399 : 0 : TimestampDifferenceExceeds(w->launch_time, now,
400 : : wal_receiver_timeout))
401 : : {
402 [ # # ]: 0 : elog(WARNING,
403 : : "logical replication worker for subscription %u took too long to start; canceled",
404 : : w->subid);
405 : :
406 : 0 : logicalrep_worker_cleanup(w);
407 : 0 : did_cleanup = true;
408 : : }
409 : : }
410 : :
411 [ # # ]: 0 : if (did_cleanup)
412 : 0 : goto retry;
413 : : }
414 : :
415 : : /*
416 : : * We don't allow to invoke more sync workers once we have reached the
417 : : * sync worker limit per subscription. So, just return silently as we
418 : : * might get here because of an otherwise harmless race condition.
419 : : */
754 akapila@postgresql.o 420 [ + + - + ]:CBC 410 : if (is_tablesync_worker && nsyncworkers >= max_sync_workers_per_subscription)
421 : : {
3055 peter_e@gmx.net 422 :UBC 0 : LWLockRelease(LogicalRepWorkerLock);
971 akapila@postgresql.o 423 : 0 : return false;
424 : : }
425 : :
971 akapila@postgresql.o 426 :CBC 410 : nparallelapplyworkers = logicalrep_pa_worker_count(subid);
427 : :
428 : : /*
429 : : * Return false if the number of parallel apply workers reached the limit
430 : : * per subscription.
431 : : */
432 [ + + ]: 410 : if (is_parallel_apply_worker &&
433 [ - + ]: 10 : nparallelapplyworkers >= max_parallel_apply_workers_per_subscription)
434 : : {
971 akapila@postgresql.o 435 :UBC 0 : LWLockRelease(LogicalRepWorkerLock);
436 : 0 : return false;
437 : : }
438 : :
439 : : /*
440 : : * However if there are no more free worker slots, inform user about it
441 : : * before exiting.
442 : : */
3152 peter_e@gmx.net 443 [ - + ]:CBC 410 : if (worker == NULL)
444 : : {
3147 fujii@postgresql.org 445 :UBC 0 : LWLockRelease(LogicalRepWorkerLock);
3152 peter_e@gmx.net 446 [ # # ]: 0 : ereport(WARNING,
447 : : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
448 : : errmsg("out of logical replication worker slots"),
449 : : errhint("You might need to increase \"%s\".", "max_logical_replication_workers")));
971 akapila@postgresql.o 450 : 0 : return false;
451 : : }
452 : :
453 : : /* Prepare the worker slot. */
754 akapila@postgresql.o 454 :CBC 410 : worker->type = wtype;
3055 peter_e@gmx.net 455 : 410 : worker->launch_time = now;
456 : 410 : worker->in_use = true;
457 : 410 : worker->generation++;
3089 458 : 410 : worker->proc = NULL;
3152 459 : 410 : worker->dbid = dbid;
460 : 410 : worker->userid = userid;
461 : 410 : worker->subid = subid;
3089 462 : 410 : worker->relid = relid;
463 : 410 : worker->relstate = SUBREL_STATE_UNKNOWN;
464 : 410 : worker->relstate_lsn = InvalidXLogRecPtr;
1465 akapila@postgresql.o 465 : 410 : worker->stream_fileset = NULL;
962 466 [ + + ]: 410 : worker->leader_pid = is_parallel_apply_worker ? MyProcPid : InvalidPid;
971 467 : 410 : worker->parallel_apply = is_parallel_apply_worker;
45 akapila@postgresql.o 468 :GNC 410 : worker->oldest_nonremovable_xid = retain_dead_tuples
469 : 1 : ? MyReplicationSlot->data.xmin
470 [ + + ]: 410 : : InvalidTransactionId;
3089 peter_e@gmx.net 471 :CBC 410 : worker->last_lsn = InvalidXLogRecPtr;
472 : 410 : TIMESTAMP_NOBEGIN(worker->last_send_time);
473 : 410 : TIMESTAMP_NOBEGIN(worker->last_recv_time);
474 : 410 : worker->reply_lsn = InvalidXLogRecPtr;
475 : 410 : TIMESTAMP_NOBEGIN(worker->reply_time);
476 : :
477 : : /* Before releasing lock, remember generation for future identification. */
2910 tgl@sss.pgh.pa.us 478 : 410 : generation = worker->generation;
479 : :
3152 peter_e@gmx.net 480 : 410 : LWLockRelease(LogicalRepWorkerLock);
481 : :
482 : : /* Register the new dynamic worker. */
3065 tgl@sss.pgh.pa.us 483 : 410 : memset(&bgw, 0, sizeof(bgw));
3034 bruce@momjian.us 484 : 410 : bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
485 : : BGWORKER_BACKEND_DATABASE_CONNECTION;
3152 peter_e@gmx.net 486 : 410 : bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
796 nathan@postgresql.or 487 : 410 : snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres");
488 : :
746 akapila@postgresql.o 489 [ + + + - : 410 : switch (worker->type)
- ]
490 : : {
491 : 204 : case WORKERTYPE_APPLY:
492 : 204 : snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain");
493 : 204 : snprintf(bgw.bgw_name, BGW_MAXLEN,
494 : : "logical replication apply worker for subscription %u",
495 : : subid);
496 : 204 : snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication apply worker");
497 : 204 : break;
498 : :
499 : 10 : case WORKERTYPE_PARALLEL_APPLY:
500 : 10 : snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ParallelApplyWorkerMain");
501 : 10 : snprintf(bgw.bgw_name, BGW_MAXLEN,
502 : : "logical replication parallel apply worker for subscription %u",
503 : : subid);
504 : 10 : snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication parallel worker");
505 : :
506 : 10 : memcpy(bgw.bgw_extra, &subworker_dsm, sizeof(dsm_handle));
507 : 10 : break;
508 : :
509 : 196 : case WORKERTYPE_TABLESYNC:
510 : 196 : snprintf(bgw.bgw_function_name, BGW_MAXLEN, "TablesyncWorkerMain");
511 : 196 : snprintf(bgw.bgw_name, BGW_MAXLEN,
512 : : "logical replication tablesync worker for subscription %u sync %u",
513 : : subid,
514 : : relid);
515 : 196 : snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication tablesync worker");
516 : 196 : break;
517 : :
746 akapila@postgresql.o 518 :UBC 0 : case WORKERTYPE_UNKNOWN:
519 : : /* Should never happen. */
520 [ # # ]: 0 : elog(ERROR, "unknown worker type");
521 : : }
522 : :
3152 peter_e@gmx.net 523 :CBC 410 : bgw.bgw_restart_time = BGW_NEVER_RESTART;
524 : 410 : bgw.bgw_notify_pid = MyProcPid;
3062 fujii@postgresql.org 525 : 410 : bgw.bgw_main_arg = Int32GetDatum(slot);
526 : :
3152 peter_e@gmx.net 527 [ - + ]: 410 : if (!RegisterDynamicBackgroundWorker(&bgw, &bgw_handle))
528 : : {
529 : : /* Failed to start worker, so clean up the worker slot. */
2910 tgl@sss.pgh.pa.us 530 :UBC 0 : LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
531 [ # # ]: 0 : Assert(generation == worker->generation);
532 : 0 : logicalrep_worker_cleanup(worker);
533 : 0 : LWLockRelease(LogicalRepWorkerLock);
534 : :
3152 peter_e@gmx.net 535 [ # # ]: 0 : ereport(WARNING,
536 : : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
537 : : errmsg("out of background worker slots"),
538 : : errhint("You might need to increase \"%s\".", "max_worker_processes")));
971 akapila@postgresql.o 539 : 0 : return false;
540 : : }
541 : :
542 : : /* Now wait until it attaches. */
971 akapila@postgresql.o 543 :CBC 410 : return WaitForReplicationWorkerAttach(worker, generation, bgw_handle);
544 : : }
545 : :
546 : : /*
547 : : * Internal function to stop the worker and wait until it detaches from the
548 : : * slot.
549 : : */
550 : : static void
551 : 83 : logicalrep_worker_stop_internal(LogicalRepWorker *worker, int signo)
552 : : {
553 : : uint16 generation;
554 : :
555 [ - + ]: 83 : Assert(LWLockHeldByMeInMode(LogicalRepWorkerLock, LW_SHARED));
556 : :
557 : : /*
558 : : * Remember which generation was our worker so we can check if what we see
559 : : * is still the same one.
560 : : */
3055 peter_e@gmx.net 561 : 83 : generation = worker->generation;
562 : :
563 : : /*
564 : : * If we found a worker but it does not have proc set then it is still
565 : : * starting up; wait for it to finish starting and then kill it.
566 : : */
567 [ + - + + ]: 83 : while (worker->in_use && !worker->proc)
568 : : {
569 : : int rc;
570 : :
3152 571 : 3 : LWLockRelease(LogicalRepWorkerLock);
572 : :
573 : : /* Wait a bit --- we don't expect to have to wait long. */
3014 andres@anarazel.de 574 : 3 : rc = WaitLatch(MyLatch,
575 : : WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
576 : : 10L, WAIT_EVENT_BGWORKER_STARTUP);
577 : :
578 [ - + ]: 3 : if (rc & WL_LATCH_SET)
579 : : {
3014 andres@anarazel.de 580 :UBC 0 : ResetLatch(MyLatch);
581 [ # # ]: 0 : CHECK_FOR_INTERRUPTS();
582 : : }
583 : :
584 : : /* Recheck worker status. */
3152 peter_e@gmx.net 585 :CBC 3 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
586 : :
587 : : /*
588 : : * Check whether the worker slot is no longer used, which would mean
589 : : * that the worker has exited, or whether the worker generation is
590 : : * different, meaning that a different worker has taken the slot.
591 : : */
3055 592 [ + - - + ]: 3 : if (!worker->in_use || worker->generation != generation)
3148 peter_e@gmx.net 593 :UBC 0 : return;
594 : :
595 : : /* Worker has assigned proc, so it has started. */
3148 peter_e@gmx.net 596 [ + - ]:CBC 3 : if (worker->proc)
3152 597 : 3 : break;
598 : : }
599 : :
600 : : /* Now terminate the worker ... */
971 akapila@postgresql.o 601 : 83 : kill(worker->proc->pid, signo);
602 : :
603 : : /* ... and wait for it to die. */
604 : : for (;;)
3152 peter_e@gmx.net 605 : 107 : {
606 : : int rc;
607 : :
608 : : /* is it gone? */
3055 609 [ + + + + ]: 190 : if (!worker->proc || worker->generation != generation)
610 : : break;
611 : :
2989 tgl@sss.pgh.pa.us 612 : 107 : LWLockRelease(LogicalRepWorkerLock);
613 : :
614 : : /* Wait a bit --- we don't expect to have to wait long. */
3014 andres@anarazel.de 615 : 107 : rc = WaitLatch(MyLatch,
616 : : WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
617 : : 10L, WAIT_EVENT_BGWORKER_SHUTDOWN);
618 : :
619 [ + + ]: 107 : if (rc & WL_LATCH_SET)
620 : : {
621 : 33 : ResetLatch(MyLatch);
622 [ + + ]: 33 : CHECK_FOR_INTERRUPTS();
623 : : }
624 : :
2989 tgl@sss.pgh.pa.us 625 : 107 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
626 : : }
627 : : }
628 : :
629 : : /*
630 : : * Stop the logical replication worker for subid/relid, if any.
631 : : */
632 : : void
971 akapila@postgresql.o 633 : 94 : logicalrep_worker_stop(Oid subid, Oid relid)
634 : : {
635 : : LogicalRepWorker *worker;
636 : :
637 : 94 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
638 : :
639 : 94 : worker = logicalrep_worker_find(subid, relid, false);
640 : :
641 [ + + ]: 94 : if (worker)
642 : : {
643 [ + - - + ]: 74 : Assert(!isParallelApplyWorker(worker));
644 : 74 : logicalrep_worker_stop_internal(worker, SIGTERM);
645 : : }
646 : :
647 : 94 : LWLockRelease(LogicalRepWorkerLock);
648 : 94 : }
649 : :
650 : : /*
651 : : * Stop the given logical replication parallel apply worker.
652 : : *
653 : : * Node that the function sends SIGINT instead of SIGTERM to the parallel apply
654 : : * worker so that the worker exits cleanly.
655 : : */
656 : : void
851 657 : 5 : logicalrep_pa_worker_stop(ParallelApplyWorkerInfo *winfo)
658 : : {
659 : : int slot_no;
660 : : uint16 generation;
661 : : LogicalRepWorker *worker;
662 : :
663 [ - + ]: 5 : SpinLockAcquire(&winfo->shared->mutex);
664 : 5 : generation = winfo->shared->logicalrep_worker_generation;
665 : 5 : slot_no = winfo->shared->logicalrep_worker_slot_no;
666 : 5 : SpinLockRelease(&winfo->shared->mutex);
667 : :
971 668 [ + - - + ]: 5 : Assert(slot_no >= 0 && slot_no < max_logical_replication_workers);
669 : :
670 : : /*
671 : : * Detach from the error_mq_handle for the parallel apply worker before
672 : : * stopping it. This prevents the leader apply worker from trying to
673 : : * receive the message from the error queue that might already be detached
674 : : * by the parallel apply worker.
675 : : */
851 676 [ + - ]: 5 : if (winfo->error_mq_handle)
677 : : {
678 : 5 : shm_mq_detach(winfo->error_mq_handle);
679 : 5 : winfo->error_mq_handle = NULL;
680 : : }
681 : :
971 682 : 5 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
683 : :
684 : 5 : worker = &LogicalRepCtx->workers[slot_no];
685 [ + - - + ]: 5 : Assert(isParallelApplyWorker(worker));
686 : :
687 : : /*
688 : : * Only stop the worker if the generation matches and the worker is alive.
689 : : */
690 [ + - + - ]: 5 : if (worker->generation == generation && worker->proc)
691 : 5 : logicalrep_worker_stop_internal(worker, SIGINT);
692 : :
2989 tgl@sss.pgh.pa.us 693 : 5 : LWLockRelease(LogicalRepWorkerLock);
3152 peter_e@gmx.net 694 : 5 : }
695 : :
696 : : /*
697 : : * Wake up (using latch) any logical replication worker for specified sub/rel.
698 : : */
699 : : void
3089 700 : 210 : logicalrep_worker_wakeup(Oid subid, Oid relid)
701 : : {
702 : : LogicalRepWorker *worker;
703 : :
704 : 210 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
705 : :
706 : 210 : worker = logicalrep_worker_find(subid, relid, true);
707 : :
708 [ + - ]: 210 : if (worker)
709 : 210 : logicalrep_worker_wakeup_ptr(worker);
710 : :
2990 tgl@sss.pgh.pa.us 711 : 210 : LWLockRelease(LogicalRepWorkerLock);
3089 peter_e@gmx.net 712 : 210 : }
713 : :
714 : : /*
715 : : * Wake up (using latch) the specified logical replication worker.
716 : : *
717 : : * Caller must hold lock, else worker->proc could change under us.
718 : : */
719 : : void
720 : 638 : logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker)
721 : : {
2990 tgl@sss.pgh.pa.us 722 [ - + ]: 638 : Assert(LWLockHeldByMe(LogicalRepWorkerLock));
723 : :
3089 peter_e@gmx.net 724 : 638 : SetLatch(&worker->proc->procLatch);
725 : 638 : }
726 : :
727 : : /*
728 : : * Attach to a slot.
729 : : */
730 : : void
3152 731 : 528 : logicalrep_worker_attach(int slot)
732 : : {
733 : : /* Block concurrent access. */
734 : 528 : LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
735 : :
736 [ + - - + ]: 528 : Assert(slot >= 0 && slot < max_logical_replication_workers);
737 : 528 : MyLogicalRepWorker = &LogicalRepCtx->workers[slot];
738 : :
3055 739 [ - + ]: 528 : if (!MyLogicalRepWorker->in_use)
740 : : {
3055 peter_e@gmx.net 741 :UBC 0 : LWLockRelease(LogicalRepWorkerLock);
742 [ # # ]: 0 : ereport(ERROR,
743 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
744 : : errmsg("logical replication worker slot %d is empty, cannot attach",
745 : : slot)));
746 : : }
747 : :
3152 peter_e@gmx.net 748 [ - + ]:CBC 528 : if (MyLogicalRepWorker->proc)
749 : : {
3055 peter_e@gmx.net 750 :UBC 0 : LWLockRelease(LogicalRepWorkerLock);
3152 751 [ # # ]: 0 : ereport(ERROR,
752 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
753 : : errmsg("logical replication worker slot %d is already used by "
754 : : "another worker, cannot attach", slot)));
755 : : }
756 : :
3152 peter_e@gmx.net 757 :CBC 528 : MyLogicalRepWorker->proc = MyProc;
758 : 528 : before_shmem_exit(logicalrep_worker_onexit, (Datum) 0);
759 : :
760 : 528 : LWLockRelease(LogicalRepWorkerLock);
761 : 528 : }
762 : :
763 : : /*
764 : : * Stop the parallel apply workers if any, and detach the leader apply worker
765 : : * (cleans up the worker info).
766 : : */
767 : : static void
768 : 528 : logicalrep_worker_detach(void)
769 : : {
770 : : /* Stop the parallel apply workers. */
971 akapila@postgresql.o 771 [ + + ]: 528 : if (am_leader_apply_worker())
772 : : {
773 : : List *workers;
774 : : ListCell *lc;
775 : :
776 : : /*
777 : : * Detach from the error_mq_handle for all parallel apply workers
778 : : * before terminating them. This prevents the leader apply worker from
779 : : * receiving the worker termination message and sending it to logs
780 : : * when the same is already done by the parallel worker.
781 : : */
782 : 320 : pa_detach_all_error_mq();
783 : :
784 : 320 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
785 : :
409 786 : 320 : workers = logicalrep_workers_find(MyLogicalRepWorker->subid, true, false);
971 787 [ + - + + : 645 : foreach(lc, workers)
+ + ]
788 : : {
789 : 325 : LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
790 : :
791 [ + - + + ]: 325 : if (isParallelApplyWorker(w))
792 : 4 : logicalrep_worker_stop_internal(w, SIGTERM);
793 : : }
794 : :
795 : 320 : LWLockRelease(LogicalRepWorkerLock);
796 : :
35 tgl@sss.pgh.pa.us 797 :GNC 320 : list_free(workers);
798 : : }
799 : :
800 : : /* Block concurrent access. */
3152 peter_e@gmx.net 801 :CBC 528 : LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
802 : :
3055 803 : 528 : logicalrep_worker_cleanup(MyLogicalRepWorker);
804 : :
3152 805 : 528 : LWLockRelease(LogicalRepWorkerLock);
806 : 528 : }
807 : :
808 : : /*
809 : : * Clean up worker info.
810 : : */
811 : : static void
3055 812 : 528 : logicalrep_worker_cleanup(LogicalRepWorker *worker)
813 : : {
814 [ - + ]: 528 : Assert(LWLockHeldByMeInMode(LogicalRepWorkerLock, LW_EXCLUSIVE));
815 : :
743 akapila@postgresql.o 816 : 528 : worker->type = WORKERTYPE_UNKNOWN;
3055 peter_e@gmx.net 817 : 528 : worker->in_use = false;
818 : 528 : worker->proc = NULL;
819 : 528 : worker->dbid = InvalidOid;
820 : 528 : worker->userid = InvalidOid;
821 : 528 : worker->subid = InvalidOid;
822 : 528 : worker->relid = InvalidOid;
962 akapila@postgresql.o 823 : 528 : worker->leader_pid = InvalidPid;
971 824 : 528 : worker->parallel_apply = false;
3055 peter_e@gmx.net 825 : 528 : }
826 : :
827 : : /*
828 : : * Cleanup function for logical replication launcher.
829 : : *
830 : : * Called on logical replication launcher exit.
831 : : */
832 : : static void
3062 fujii@postgresql.org 833 : 387 : logicalrep_launcher_onexit(int code, Datum arg)
834 : : {
835 : 387 : LogicalRepCtx->launcher_pid = 0;
836 : 387 : }
837 : :
838 : : /*
839 : : * Cleanup function.
840 : : *
841 : : * Called on logical replication worker exit.
842 : : */
843 : : static void
3152 peter_e@gmx.net 844 : 528 : logicalrep_worker_onexit(int code, Datum arg)
845 : : {
846 : : /* Disconnect gracefully from the remote side. */
1578 alvherre@alvh.no-ip. 847 [ + + ]: 528 : if (LogRepWorkerWalRcvConn)
848 : 413 : walrcv_disconnect(LogRepWorkerWalRcvConn);
849 : :
3152 peter_e@gmx.net 850 : 528 : logicalrep_worker_detach();
851 : :
852 : : /* Cleanup fileset used for streaming transactions. */
1465 akapila@postgresql.o 853 [ + + ]: 528 : if (MyLogicalRepWorker->stream_fileset != NULL)
854 : 14 : FileSetDeleteAll(MyLogicalRepWorker->stream_fileset);
855 : :
856 : : /*
857 : : * Session level locks may be acquired outside of a transaction in
858 : : * parallel apply mode and will not be released when the worker
859 : : * terminates, so manually release all locks before the worker exits.
860 : : *
861 : : * The locks will be acquired once the worker is initialized.
862 : : */
857 863 [ + + ]: 528 : if (!InitializingApplyWorker)
864 : 465 : LockReleaseAll(DEFAULT_LOCKMETHOD, true);
865 : :
3019 peter_e@gmx.net 866 : 528 : ApplyLauncherWakeup();
3152 867 : 528 : }
868 : :
869 : : /*
870 : : * Count the number of registered (not necessarily running) sync workers
871 : : * for a subscription.
872 : : */
873 : : int
3089 874 : 1313 : logicalrep_sync_worker_count(Oid subid)
875 : : {
876 : : int i;
3034 bruce@momjian.us 877 : 1313 : int res = 0;
878 : :
3089 peter_e@gmx.net 879 [ - + ]: 1313 : Assert(LWLockHeldByMe(LogicalRepWorkerLock));
880 : :
881 : : /* Search for attached worker for a given subscription id. */
882 [ + + ]: 6771 : for (i = 0; i < max_logical_replication_workers; i++)
883 : : {
3034 bruce@momjian.us 884 : 5458 : LogicalRepWorker *w = &LogicalRepCtx->workers[i];
885 : :
743 akapila@postgresql.o 886 [ + + + + : 5458 : if (isTablesyncWorker(w) && w->subid == subid)
+ + ]
3089 peter_e@gmx.net 887 : 1541 : res++;
888 : : }
889 : :
890 : 1313 : return res;
891 : : }
892 : :
893 : : /*
894 : : * Count the number of registered (but not necessarily running) parallel apply
895 : : * workers for a subscription.
896 : : */
897 : : static int
971 akapila@postgresql.o 898 : 410 : logicalrep_pa_worker_count(Oid subid)
899 : : {
900 : : int i;
901 : 410 : int res = 0;
902 : :
903 [ - + ]: 410 : Assert(LWLockHeldByMe(LogicalRepWorkerLock));
904 : :
905 : : /*
906 : : * Scan all attached parallel apply workers, only counting those which
907 : : * have the given subscription id.
908 : : */
909 [ + + ]: 2174 : for (i = 0; i < max_logical_replication_workers; i++)
910 : : {
911 : 1764 : LogicalRepWorker *w = &LogicalRepCtx->workers[i];
912 : :
743 913 [ + + + + : 1764 : if (isParallelApplyWorker(w) && w->subid == subid)
+ - ]
971 914 : 2 : res++;
915 : : }
916 : :
917 : 410 : return res;
918 : : }
919 : :
920 : : /*
921 : : * ApplyLauncherShmemSize
922 : : * Compute space needed for replication launcher shared memory
923 : : */
924 : : Size
3152 peter_e@gmx.net 925 : 3967 : ApplyLauncherShmemSize(void)
926 : : {
927 : : Size size;
928 : :
929 : : /*
930 : : * Need the fixed struct and the array of LogicalRepWorker.
931 : : */
932 : 3967 : size = sizeof(LogicalRepCtxStruct);
933 : 3967 : size = MAXALIGN(size);
934 : 3967 : size = add_size(size, mul_size(max_logical_replication_workers,
935 : : sizeof(LogicalRepWorker)));
936 : 3967 : return size;
937 : : }
938 : :
939 : : /*
940 : : * ApplyLauncherRegister
941 : : * Register a background worker running the logical replication launcher.
942 : : */
943 : : void
944 : 813 : ApplyLauncherRegister(void)
945 : : {
946 : : BackgroundWorker bgw;
947 : :
948 : : /*
949 : : * The logical replication launcher is disabled during binary upgrades, to
950 : : * prevent logical replication workers from running on the source cluster.
951 : : * That could cause replication origins to move forward after having been
952 : : * copied to the target cluster, potentially creating conflicts with the
953 : : * copied data files.
954 : : */
603 michael@paquier.xyz 955 [ + + + + ]: 813 : if (max_logical_replication_workers == 0 || IsBinaryUpgrade)
3152 peter_e@gmx.net 956 : 53 : return;
957 : :
3065 tgl@sss.pgh.pa.us 958 : 760 : memset(&bgw, 0, sizeof(bgw));
3034 bruce@momjian.us 959 : 760 : bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
960 : : BGWORKER_BACKEND_DATABASE_CONNECTION;
3152 peter_e@gmx.net 961 : 760 : bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
796 nathan@postgresql.or 962 : 760 : snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres");
3081 rhaas@postgresql.org 963 : 760 : snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyLauncherMain");
3152 peter_e@gmx.net 964 : 760 : snprintf(bgw.bgw_name, BGW_MAXLEN,
965 : : "logical replication launcher");
2928 966 : 760 : snprintf(bgw.bgw_type, BGW_MAXLEN,
967 : : "logical replication launcher");
3152 968 : 760 : bgw.bgw_restart_time = 5;
969 : 760 : bgw.bgw_notify_pid = 0;
970 : 760 : bgw.bgw_main_arg = (Datum) 0;
971 : :
972 : 760 : RegisterBackgroundWorker(&bgw);
973 : : }
974 : :
975 : : /*
976 : : * ApplyLauncherShmemInit
977 : : * Allocate and initialize replication launcher shared memory
978 : : */
979 : : void
980 : 1029 : ApplyLauncherShmemInit(void)
981 : : {
982 : : bool found;
983 : :
984 : 1029 : LogicalRepCtx = (LogicalRepCtxStruct *)
985 : 1029 : ShmemInitStruct("Logical Replication Launcher Data",
986 : : ApplyLauncherShmemSize(),
987 : : &found);
988 : :
989 [ + - ]: 1029 : if (!found)
990 : : {
991 : : int slot;
992 : :
993 : 1029 : memset(LogicalRepCtx, 0, ApplyLauncherShmemSize());
994 : :
955 tgl@sss.pgh.pa.us 995 : 1029 : LogicalRepCtx->last_start_dsa = DSA_HANDLE_INVALID;
996 : 1029 : LogicalRepCtx->last_start_dsh = DSHASH_HANDLE_INVALID;
997 : :
998 : : /* Initialize memory and spin locks for each worker slot. */
3089 peter_e@gmx.net 999 [ + + ]: 5108 : for (slot = 0; slot < max_logical_replication_workers; slot++)
1000 : : {
1001 : 4079 : LogicalRepWorker *worker = &LogicalRepCtx->workers[slot];
1002 : :
1003 : 4079 : memset(worker, 0, sizeof(LogicalRepWorker));
1004 : 4079 : SpinLockInit(&worker->relmutex);
1005 : : }
1006 : : }
3152 1007 : 1029 : }
1008 : :
1009 : : /*
1010 : : * Initialize or attach to the dynamic shared hash table that stores the
1011 : : * last-start times, if not already done.
1012 : : * This must be called before accessing the table.
1013 : : */
1014 : : static void
958 tgl@sss.pgh.pa.us 1015 : 728 : logicalrep_launcher_attach_dshmem(void)
1016 : : {
1017 : : MemoryContext oldcontext;
1018 : :
1019 : : /* Quick exit if we already did this. */
955 1020 [ + + ]: 728 : if (LogicalRepCtx->last_start_dsh != DSHASH_HANDLE_INVALID &&
958 1021 [ + + ]: 675 : last_start_times != NULL)
1022 : 489 : return;
1023 : :
1024 : : /* Otherwise, use a lock to ensure only one process creates the table. */
1025 : 239 : LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
1026 : :
1027 : : /* Be sure any local memory allocated by DSA routines is persistent. */
1028 : 239 : oldcontext = MemoryContextSwitchTo(TopMemoryContext);
1029 : :
955 1030 [ + + ]: 239 : if (LogicalRepCtx->last_start_dsh == DSHASH_HANDLE_INVALID)
1031 : : {
1032 : : /* Initialize dynamic shared hash table for last-start times. */
958 1033 : 53 : last_start_times_dsa = dsa_create(LWTRANCHE_LAUNCHER_DSA);
1034 : 53 : dsa_pin(last_start_times_dsa);
1035 : 53 : dsa_pin_mapping(last_start_times_dsa);
558 nathan@postgresql.or 1036 : 53 : last_start_times = dshash_create(last_start_times_dsa, &dsh_params, NULL);
1037 : :
1038 : : /* Store handles in shared memory for other backends to use. */
958 tgl@sss.pgh.pa.us 1039 : 53 : LogicalRepCtx->last_start_dsa = dsa_get_handle(last_start_times_dsa);
1040 : 53 : LogicalRepCtx->last_start_dsh = dshash_get_hash_table_handle(last_start_times);
1041 : : }
1042 [ + - ]: 186 : else if (!last_start_times)
1043 : : {
1044 : : /* Attach to existing dynamic shared hash table. */
1045 : 186 : last_start_times_dsa = dsa_attach(LogicalRepCtx->last_start_dsa);
1046 : 186 : dsa_pin_mapping(last_start_times_dsa);
1047 : 186 : last_start_times = dshash_attach(last_start_times_dsa, &dsh_params,
92 nathan@postgresql.or 1048 : 186 : LogicalRepCtx->last_start_dsh, NULL);
1049 : : }
1050 : :
958 tgl@sss.pgh.pa.us 1051 : 239 : MemoryContextSwitchTo(oldcontext);
1052 : 239 : LWLockRelease(LogicalRepWorkerLock);
1053 : : }
1054 : :
1055 : : /*
1056 : : * Set the last-start time for the subscription.
1057 : : */
1058 : : static void
1059 : 204 : ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time)
1060 : : {
1061 : : LauncherLastStartTimesEntry *entry;
1062 : : bool found;
1063 : :
1064 : 204 : logicalrep_launcher_attach_dshmem();
1065 : :
1066 : 204 : entry = dshash_find_or_insert(last_start_times, &subid, &found);
1067 : 204 : entry->last_start_time = start_time;
1068 : 204 : dshash_release_lock(last_start_times, entry);
1069 : 204 : }
1070 : :
1071 : : /*
1072 : : * Return the last-start time for the subscription, or 0 if there isn't one.
1073 : : */
1074 : : static TimestampTz
1075 : 299 : ApplyLauncherGetWorkerStartTime(Oid subid)
1076 : : {
1077 : : LauncherLastStartTimesEntry *entry;
1078 : : TimestampTz ret;
1079 : :
1080 : 299 : logicalrep_launcher_attach_dshmem();
1081 : :
1082 : 299 : entry = dshash_find(last_start_times, &subid, false);
1083 [ + + ]: 299 : if (entry == NULL)
1084 : 126 : return 0;
1085 : :
1086 : 173 : ret = entry->last_start_time;
1087 : 173 : dshash_release_lock(last_start_times, entry);
1088 : :
1089 : 173 : return ret;
1090 : : }
1091 : :
1092 : : /*
1093 : : * Remove the last-start-time entry for the subscription, if one exists.
1094 : : *
1095 : : * This has two use-cases: to remove the entry related to a subscription
1096 : : * that's been deleted or disabled (just to avoid leaking shared memory),
1097 : : * and to allow immediate restart of an apply worker that has exited
1098 : : * due to subscription parameter changes.
1099 : : */
1100 : : void
1101 : 225 : ApplyLauncherForgetWorkerStartTime(Oid subid)
1102 : : {
1103 : 225 : logicalrep_launcher_attach_dshmem();
1104 : :
1105 : 225 : (void) dshash_delete_key(last_start_times, &subid);
1106 : 225 : }
1107 : :
1108 : : /*
1109 : : * Wakeup the launcher on commit if requested.
1110 : : */
1111 : : void
3050 peter_e@gmx.net 1112 : 318579 : AtEOXact_ApplyLauncher(bool isCommit)
1113 : : {
2955 1114 [ + + ]: 318579 : if (isCommit)
1115 : : {
1116 [ + + ]: 293340 : if (on_commit_launcher_wakeup)
1117 : 140 : ApplyLauncherWakeup();
1118 : : }
1119 : :
3050 1120 : 318579 : on_commit_launcher_wakeup = false;
3152 1121 : 318579 : }
1122 : :
1123 : : /*
1124 : : * Request wakeup of the launcher on commit of the transaction.
1125 : : *
1126 : : * This is used to send launcher signal to stop sleeping and process the
1127 : : * subscriptions when current transaction commits. Should be used when new
1128 : : * tuple was added to the pg_subscription catalog.
1129 : : */
1130 : : void
1131 : 140 : ApplyLauncherWakeupAtCommit(void)
1132 : : {
3134 heikki.linnakangas@i 1133 [ + - ]: 140 : if (!on_commit_launcher_wakeup)
1134 : 140 : on_commit_launcher_wakeup = true;
3152 peter_e@gmx.net 1135 : 140 : }
1136 : :
1137 : : /*
1138 : : * Wakeup the launcher immediately.
1139 : : */
1140 : : void
1141 : 693 : ApplyLauncherWakeup(void)
1142 : : {
3062 fujii@postgresql.org 1143 [ + + ]: 693 : if (LogicalRepCtx->launcher_pid != 0)
3152 peter_e@gmx.net 1144 : 667 : kill(LogicalRepCtx->launcher_pid, SIGUSR1);
1145 : 693 : }
1146 : :
1147 : : /*
1148 : : * Main loop for the apply launcher process.
1149 : : */
1150 : : void
1151 : 387 : ApplyLauncherMain(Datum main_arg)
1152 : : {
3102 tgl@sss.pgh.pa.us 1153 [ + + ]: 387 : ereport(DEBUG1,
1154 : : (errmsg_internal("logical replication launcher started")));
1155 : :
3062 fujii@postgresql.org 1156 : 387 : before_shmem_exit(logicalrep_launcher_onexit, (Datum) 0);
1157 : :
3012 andres@anarazel.de 1158 [ - + ]: 387 : Assert(LogicalRepCtx->launcher_pid == 0);
1159 : 387 : LogicalRepCtx->launcher_pid = MyProcPid;
1160 : :
1161 : : /* Establish signal handlers. */
1877 akapila@postgresql.o 1162 : 387 : pqsignal(SIGHUP, SignalHandlerForConfigReload);
3012 andres@anarazel.de 1163 : 387 : pqsignal(SIGTERM, die);
3152 peter_e@gmx.net 1164 : 387 : BackgroundWorkerUnblockSignals();
1165 : :
1166 : : /*
1167 : : * Establish connection to nailed catalogs (we only ever access
1168 : : * pg_subscription).
1169 : : */
2711 magnus@hagander.net 1170 : 387 : BackgroundWorkerInitializeConnection(NULL, NULL, 0);
1171 : :
1172 : : /*
1173 : : * Acquire the conflict detection slot at startup to ensure it can be
1174 : : * dropped if no longer needed after a restart.
1175 : : */
45 akapila@postgresql.o 1176 :GNC 387 : acquire_conflict_slot_if_exists();
1177 : :
1178 : : /* Enter main loop */
1179 : : for (;;)
3152 peter_e@gmx.net 1180 :CBC 2283 : {
1181 : : int rc;
1182 : : List *sublist;
1183 : : ListCell *lc;
1184 : : MemoryContext subctx;
1185 : : MemoryContext oldctx;
3034 bruce@momjian.us 1186 : 2670 : long wait_time = DEFAULT_NAPTIME_PER_CYCLE;
4 akapila@postgresql.o 1187 :GNC 2670 : bool can_update_xmin = true;
45 1188 : 2670 : bool retain_dead_tuples = false;
1189 : 2670 : TransactionId xmin = InvalidTransactionId;
1190 : :
3012 andres@anarazel.de 1191 [ + + ]:CBC 2670 : CHECK_FOR_INTERRUPTS();
1192 : :
1193 : : /* Use temporary context to avoid leaking memory across cycles. */
958 tgl@sss.pgh.pa.us 1194 : 2669 : subctx = AllocSetContextCreate(TopMemoryContext,
1195 : : "Logical Replication Launcher sublist",
1196 : : ALLOCSET_DEFAULT_SIZES);
1197 : 2669 : oldctx = MemoryContextSwitchTo(subctx);
1198 : :
1199 : : /*
1200 : : * Start any missing workers for enabled subscriptions.
1201 : : *
1202 : : * Also, during the iteration through all subscriptions, we compute
1203 : : * the minimum XID required to protect deleted tuples for conflict
1204 : : * detection if one of the subscription enables retain_dead_tuples
1205 : : * option.
1206 : : */
1207 : 2669 : sublist = get_subscription_list();
1208 [ + + + + : 3489 : foreach(lc, sublist)
+ + ]
1209 : : {
1210 : 822 : Subscription *sub = (Subscription *) lfirst(lc);
1211 : : LogicalRepWorker *w;
1212 : : TimestampTz last_start;
1213 : : TimestampTz now;
1214 : : long elapsed;
1215 : :
45 akapila@postgresql.o 1216 [ + + ]:GNC 822 : if (sub->retaindeadtuples)
1217 : : {
1218 : 5 : retain_dead_tuples = true;
1219 : :
1220 : : /*
1221 : : * Create a replication slot to retain information necessary
1222 : : * for conflict detection such as dead tuples, commit
1223 : : * timestamps, and origins.
1224 : : *
1225 : : * The slot is created before starting the apply worker to
1226 : : * prevent it from unnecessarily maintaining its
1227 : : * oldest_nonremovable_xid.
1228 : : *
1229 : : * The slot is created even for a disabled subscription to
1230 : : * ensure that conflict-related information is available when
1231 : : * applying remote changes that occurred before the
1232 : : * subscription was enabled.
1233 : : */
1234 : 5 : CreateConflictDetectionSlot();
1235 : :
4 1236 [ + - ]: 5 : if (sub->retentionactive)
1237 : : {
1238 : : /*
1239 : : * Can't advance xmin of the slot unless all the
1240 : : * subscriptions actively retaining dead tuples are
1241 : : * enabled. This is required to ensure that we don't
1242 : : * advance the xmin of CONFLICT_DETECTION_SLOT if one of
1243 : : * the subscriptions is not enabled. Otherwise, we won't
1244 : : * be able to detect conflicts reliably for such a
1245 : : * subscription even though it has set the
1246 : : * retain_dead_tuples option.
1247 : : */
1248 : 5 : can_update_xmin &= sub->enabled;
1249 : :
1250 : : /*
1251 : : * Initialize the slot once the subscription activiates
1252 : : * retention.
1253 : : */
1254 [ - + ]: 5 : if (!TransactionIdIsValid(MyReplicationSlot->data.xmin))
4 akapila@postgresql.o 1255 :UNC 0 : init_conflict_slot_xmin();
1256 : : }
1257 : : }
1258 : :
958 tgl@sss.pgh.pa.us 1259 [ + + ]:CBC 822 : if (!sub->enabled)
1260 : 36 : continue;
1261 : :
1262 : 786 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
1263 : 786 : w = logicalrep_worker_find(sub->oid, InvalidOid, false);
1264 : 786 : LWLockRelease(LogicalRepWorkerLock);
1265 : :
1266 [ + + ]: 786 : if (w != NULL)
1267 : : {
1268 : : /*
1269 : : * Compute the minimum xmin required to protect dead tuples
1270 : : * required for conflict detection among all running apply
1271 : : * workers.
1272 : : */
4 akapila@postgresql.o 1273 [ + + ]:GNC 487 : if (sub->retaindeadtuples &&
1274 [ + - + - ]: 3 : sub->retentionactive &&
1275 : : can_update_xmin)
45 1276 : 3 : compute_min_nonremovable_xid(w, &xmin);
1277 : :
1278 : : /* worker is running already */
1279 : 487 : continue;
1280 : : }
1281 : :
1282 : : /*
1283 : : * Can't advance xmin of the slot unless all the workers
1284 : : * corresponding to subscriptions actively retaining dead tuples
1285 : : * are running, disabling the further computation of the minimum
1286 : : * nonremovable xid.
1287 : : */
4 1288 [ + + + - ]: 299 : if (sub->retaindeadtuples && sub->retentionactive)
1289 : 1 : can_update_xmin = false;
1290 : :
1291 : : /*
1292 : : * If the worker is eligible to start now, launch it. Otherwise,
1293 : : * adjust wait_time so that we'll wake up as soon as it can be
1294 : : * started.
1295 : : *
1296 : : * Each subscription's apply worker can only be restarted once per
1297 : : * wal_retrieve_retry_interval, so that errors do not cause us to
1298 : : * repeatedly restart the worker as fast as possible. In cases
1299 : : * where a restart is expected (e.g., subscription parameter
1300 : : * changes), another process should remove the last-start entry
1301 : : * for the subscription so that the worker can be restarted
1302 : : * without waiting for wal_retrieve_retry_interval to elapse.
1303 : : */
958 tgl@sss.pgh.pa.us 1304 :CBC 299 : last_start = ApplyLauncherGetWorkerStartTime(sub->oid);
1305 : 299 : now = GetCurrentTimestamp();
1306 [ + + ]: 299 : if (last_start == 0 ||
1307 [ + + ]: 173 : (elapsed = TimestampDifferenceMilliseconds(last_start, now)) >= wal_retrieve_retry_interval)
1308 : : {
1309 : 204 : ApplyLauncherSetWorkerStartTime(sub->oid, now);
74 1310 [ - + ]: 202 : if (!logicalrep_worker_launch(WORKERTYPE_APPLY,
1311 : 204 : sub->dbid, sub->oid, sub->name,
1312 : : sub->owner, InvalidOid,
1313 : : DSM_HANDLE_INVALID,
4 akapila@postgresql.o 1314 [ + + ]:GNC 205 : sub->retaindeadtuples &&
1315 [ + - ]: 1 : sub->retentionactive))
1316 : : {
1317 : : /*
1318 : : * We get here either if we failed to launch a worker
1319 : : * (perhaps for resource-exhaustion reasons) or if we
1320 : : * launched one but it immediately quit. Either way, it
1321 : : * seems appropriate to try again after
1322 : : * wal_retrieve_retry_interval.
1323 : : */
74 tgl@sss.pgh.pa.us 1324 :LBC (1) : wait_time = Min(wait_time,
1325 : : wal_retrieve_retry_interval);
1326 : : }
1327 : : }
1328 : : else
1329 : : {
958 tgl@sss.pgh.pa.us 1330 :CBC 95 : wait_time = Min(wait_time,
1331 : : wal_retrieve_retry_interval - elapsed);
1332 : : }
1333 : : }
1334 : :
1335 : : /*
1336 : : * Drop the CONFLICT_DETECTION_SLOT slot if there is no subscription
1337 : : * that requires us to retain dead tuples. Otherwise, if required,
1338 : : * advance the slot's xmin to protect dead tuples required for the
1339 : : * conflict detection.
1340 : : *
1341 : : * Additionally, if all apply workers for subscriptions with
1342 : : * retain_dead_tuples enabled have requested to stop retention, the
1343 : : * slot's xmin will be set to InvalidTransactionId allowing the
1344 : : * removal of dead tuples.
1345 : : */
45 akapila@postgresql.o 1346 [ + + ]:GNC 2667 : if (MyReplicationSlot)
1347 : : {
1348 [ + + ]: 6 : if (!retain_dead_tuples)
1349 : 1 : ReplicationSlotDropAcquired();
4 1350 [ + + ]: 5 : else if (can_update_xmin)
1351 : 3 : update_conflict_slot_xmin(xmin);
1352 : : }
1353 : :
1354 : : /* Switch back to original memory context. */
958 tgl@sss.pgh.pa.us 1355 :CBC 2667 : MemoryContextSwitchTo(oldctx);
1356 : : /* Clean the temporary memory. */
1357 : 2667 : MemoryContextDelete(subctx);
1358 : :
1359 : : /* Wait for more work. */
3014 andres@anarazel.de 1360 : 2667 : rc = WaitLatch(MyLatch,
1361 : : WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
1362 : : wait_time,
1363 : : WAIT_EVENT_LOGICAL_LAUNCHER_MAIN);
1364 : :
1365 [ + + ]: 2664 : if (rc & WL_LATCH_SET)
1366 : : {
1367 : 2647 : ResetLatch(MyLatch);
1368 [ + + ]: 2647 : CHECK_FOR_INTERRUPTS();
1369 : : }
1370 : :
2090 rhaas@postgresql.org 1371 [ + + ]: 2283 : if (ConfigReloadPending)
1372 : : {
1373 : 40 : ConfigReloadPending = false;
3071 peter_e@gmx.net 1374 : 40 : ProcessConfigFile(PGC_SIGHUP);
1375 : : }
1376 : : }
1377 : :
1378 : : /* Not reachable */
1379 : : }
1380 : :
1381 : : /*
1382 : : * Determine the minimum non-removable transaction ID across all apply workers
1383 : : * for subscriptions that have retain_dead_tuples enabled. Store the result
1384 : : * in *xmin.
1385 : : */
1386 : : static void
45 akapila@postgresql.o 1387 :GNC 3 : compute_min_nonremovable_xid(LogicalRepWorker *worker, TransactionId *xmin)
1388 : : {
1389 : : TransactionId nonremovable_xid;
1390 : :
1391 [ - + ]: 3 : Assert(worker != NULL);
1392 : :
1393 : : /*
1394 : : * The replication slot for conflict detection must be created before the
1395 : : * worker starts.
1396 : : */
1397 [ - + ]: 3 : Assert(MyReplicationSlot);
1398 : :
1399 [ - + ]: 3 : SpinLockAcquire(&worker->relmutex);
1400 : 3 : nonremovable_xid = worker->oldest_nonremovable_xid;
1401 : 3 : SpinLockRelease(&worker->relmutex);
1402 : :
1403 : : /*
1404 : : * Return if the apply worker has stopped retention concurrently.
1405 : : *
1406 : : * Although this function is invoked only when retentionactive is true,
1407 : : * the apply worker might stop retention after the launcher fetches the
1408 : : * retentionactive flag.
1409 : : */
4 1410 [ - + ]: 3 : if (!TransactionIdIsValid(nonremovable_xid))
4 akapila@postgresql.o 1411 :UNC 0 : return;
1412 : :
45 akapila@postgresql.o 1413 [ - + - - ]:GNC 3 : if (!TransactionIdIsValid(*xmin) ||
45 akapila@postgresql.o 1414 :UNC 0 : TransactionIdPrecedes(nonremovable_xid, *xmin))
45 akapila@postgresql.o 1415 :GNC 3 : *xmin = nonremovable_xid;
1416 : : }
1417 : :
1418 : : /*
1419 : : * Acquire the replication slot used to retain information for conflict
1420 : : * detection, if it exists.
1421 : : *
1422 : : * Return true if successfully acquired, otherwise return false.
1423 : : */
1424 : : static bool
1425 : 387 : acquire_conflict_slot_if_exists(void)
1426 : : {
1427 [ + - ]: 387 : if (!SearchNamedReplicationSlot(CONFLICT_DETECTION_SLOT, true))
1428 : 387 : return false;
1429 : :
45 akapila@postgresql.o 1430 :UNC 0 : ReplicationSlotAcquire(CONFLICT_DETECTION_SLOT, true, false);
1431 : 0 : return true;
1432 : : }
1433 : :
1434 : : /*
1435 : : * Update the xmin the replication slot used to retain information required
1436 : : * for conflict detection.
1437 : : */
1438 : : static void
4 akapila@postgresql.o 1439 :GNC 3 : update_conflict_slot_xmin(TransactionId new_xmin)
1440 : : {
45 1441 [ - + ]: 3 : Assert(MyReplicationSlot);
4 1442 [ + - - + ]: 3 : Assert(!TransactionIdIsValid(new_xmin) ||
1443 : : TransactionIdPrecedesOrEquals(MyReplicationSlot->data.xmin, new_xmin));
1444 : :
1445 : : /* Return if the xmin value of the slot cannot be updated */
45 1446 [ + + ]: 3 : if (TransactionIdEquals(MyReplicationSlot->data.xmin, new_xmin))
1447 : 2 : return;
1448 : :
1449 [ - + ]: 1 : SpinLockAcquire(&MyReplicationSlot->mutex);
1450 : 1 : MyReplicationSlot->effective_xmin = new_xmin;
1451 : 1 : MyReplicationSlot->data.xmin = new_xmin;
1452 : 1 : SpinLockRelease(&MyReplicationSlot->mutex);
1453 : :
1454 [ - + ]: 1 : elog(DEBUG1, "updated xmin: %u", MyReplicationSlot->data.xmin);
1455 : :
1456 : 1 : ReplicationSlotMarkDirty();
1457 : 1 : ReplicationSlotsComputeRequiredXmin(false);
1458 : :
1459 : : /*
1460 : : * Like PhysicalConfirmReceivedLocation(), do not save slot information
1461 : : * each time. This is acceptable because all concurrent transactions on
1462 : : * the publisher that require the data preceding the slot's xmin should
1463 : : * have already been applied and flushed on the subscriber before the xmin
1464 : : * is advanced. So, even if the slot's xmin regresses after a restart, it
1465 : : * will be advanced again in the next cycle. Therefore, no data required
1466 : : * for conflict detection will be prematurely removed.
1467 : : */
1468 : 1 : return;
1469 : : }
1470 : :
1471 : : /*
1472 : : * Initialize the xmin for the conflict detection slot.
1473 : : */
1474 : : static void
4 1475 : 3 : init_conflict_slot_xmin(void)
1476 : : {
1477 : : TransactionId xmin_horizon;
1478 : :
1479 : : /* Replication slot must exist but shouldn't be initialized. */
1480 [ + - - + ]: 3 : Assert(MyReplicationSlot &&
1481 : : !TransactionIdIsValid(MyReplicationSlot->data.xmin));
1482 : :
45 1483 : 3 : LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
1484 : :
1485 : 3 : xmin_horizon = GetOldestSafeDecodingTransactionId(false);
1486 : :
1487 [ - + ]: 3 : SpinLockAcquire(&MyReplicationSlot->mutex);
1488 : 3 : MyReplicationSlot->effective_xmin = xmin_horizon;
1489 : 3 : MyReplicationSlot->data.xmin = xmin_horizon;
1490 : 3 : SpinLockRelease(&MyReplicationSlot->mutex);
1491 : :
1492 : 3 : ReplicationSlotsComputeRequiredXmin(true);
1493 : :
1494 : 3 : LWLockRelease(ProcArrayLock);
1495 : :
1496 : : /* Write this slot to disk */
1497 : 3 : ReplicationSlotMarkDirty();
1498 : 3 : ReplicationSlotSave();
1499 : 3 : }
1500 : :
1501 : : /*
1502 : : * Create and acquire the replication slot used to retain information for
1503 : : * conflict detection, if not yet.
1504 : : */
1505 : : void
4 1506 : 6 : CreateConflictDetectionSlot(void)
1507 : : {
1508 : : /* Exit early, if the replication slot is already created and acquired */
1509 [ + + ]: 6 : if (MyReplicationSlot)
1510 : 3 : return;
1511 : :
1512 [ + - ]: 3 : ereport(LOG,
1513 : : errmsg("creating replication conflict detection slot"));
1514 : :
1515 : 3 : ReplicationSlotCreate(CONFLICT_DETECTION_SLOT, false, RS_PERSISTENT, false,
1516 : : false, false);
1517 : :
1518 : 3 : init_conflict_slot_xmin();
1519 : : }
1520 : :
1521 : : /*
1522 : : * Is current process the logical replication launcher?
1523 : : */
1524 : : bool
3012 andres@anarazel.de 1525 :CBC 2314 : IsLogicalLauncher(void)
1526 : : {
1527 : 2314 : return LogicalRepCtx->launcher_pid == MyProcPid;
1528 : : }
1529 : :
1530 : : /*
1531 : : * Return the pid of the leader apply worker if the given pid is the pid of a
1532 : : * parallel apply worker, otherwise, return InvalidPid.
1533 : : */
1534 : : pid_t
962 akapila@postgresql.o 1535 : 833 : GetLeaderApplyWorkerPid(pid_t pid)
1536 : : {
1537 : 833 : int leader_pid = InvalidPid;
1538 : : int i;
1539 : :
1540 : 833 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
1541 : :
1542 [ + + ]: 4165 : for (i = 0; i < max_logical_replication_workers; i++)
1543 : : {
1544 : 3332 : LogicalRepWorker *w = &LogicalRepCtx->workers[i];
1545 : :
1546 [ + + - + : 3332 : if (isParallelApplyWorker(w) && w->proc && pid == w->proc->pid)
- - - - ]
1547 : : {
962 akapila@postgresql.o 1548 :UBC 0 : leader_pid = w->leader_pid;
1549 : 0 : break;
1550 : : }
1551 : : }
1552 : :
962 akapila@postgresql.o 1553 :CBC 833 : LWLockRelease(LogicalRepWorkerLock);
1554 : :
1555 : 833 : return leader_pid;
1556 : : }
1557 : :
1558 : : /*
1559 : : * Returns state of the subscriptions.
1560 : : */
1561 : : Datum
3152 peter_e@gmx.net 1562 : 1 : pg_stat_get_subscription(PG_FUNCTION_ARGS)
1563 : : {
1564 : : #define PG_STAT_GET_SUBSCRIPTION_COLS 10
1565 [ - + ]: 1 : Oid subid = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0);
1566 : : int i;
1567 : 1 : ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1568 : :
1054 michael@paquier.xyz 1569 : 1 : InitMaterializedSRF(fcinfo, 0);
1570 : :
1571 : : /* Make sure we get consistent view of the workers. */
3152 peter_e@gmx.net 1572 : 1 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
1573 : :
1187 tgl@sss.pgh.pa.us 1574 [ + + ]: 5 : for (i = 0; i < max_logical_replication_workers; i++)
1575 : : {
1576 : : /* for each row */
1148 peter@eisentraut.org 1577 : 4 : Datum values[PG_STAT_GET_SUBSCRIPTION_COLS] = {0};
1578 : 4 : bool nulls[PG_STAT_GET_SUBSCRIPTION_COLS] = {0};
1579 : : int worker_pid;
1580 : : LogicalRepWorker worker;
1581 : :
3152 peter_e@gmx.net 1582 : 4 : memcpy(&worker, &LogicalRepCtx->workers[i],
1583 : : sizeof(LogicalRepWorker));
1584 [ + + - + ]: 4 : if (!worker.proc || !IsBackendPid(worker.proc->pid))
1585 : 2 : continue;
1586 : :
1587 [ - + - - ]: 2 : if (OidIsValid(subid) && worker.subid != subid)
3152 peter_e@gmx.net 1588 :UBC 0 : continue;
1589 : :
3152 peter_e@gmx.net 1590 :CBC 2 : worker_pid = worker.proc->pid;
1591 : :
1592 : 2 : values[0] = ObjectIdGetDatum(worker.subid);
754 akapila@postgresql.o 1593 [ + - - + ]: 2 : if (isTablesyncWorker(&worker))
3089 peter_e@gmx.net 1594 :UBC 0 : values[1] = ObjectIdGetDatum(worker.relid);
1595 : : else
3089 peter_e@gmx.net 1596 :CBC 2 : nulls[1] = true;
1597 : 2 : values[2] = Int32GetDatum(worker_pid);
1598 : :
962 akapila@postgresql.o 1599 [ + - - + ]: 2 : if (isParallelApplyWorker(&worker))
962 akapila@postgresql.o 1600 :UBC 0 : values[3] = Int32GetDatum(worker.leader_pid);
1601 : : else
3089 peter_e@gmx.net 1602 :CBC 2 : nulls[3] = true;
1603 : :
962 akapila@postgresql.o 1604 [ - + ]: 2 : if (XLogRecPtrIsInvalid(worker.last_lsn))
962 akapila@postgresql.o 1605 :UBC 0 : nulls[4] = true;
1606 : : else
962 akapila@postgresql.o 1607 :CBC 2 : values[4] = LSNGetDatum(worker.last_lsn);
3152 peter_e@gmx.net 1608 [ - + ]: 2 : if (worker.last_send_time == 0)
962 akapila@postgresql.o 1609 :UBC 0 : nulls[5] = true;
1610 : : else
962 akapila@postgresql.o 1611 :CBC 2 : values[5] = TimestampTzGetDatum(worker.last_send_time);
3152 peter_e@gmx.net 1612 [ - + ]: 2 : if (worker.last_recv_time == 0)
962 akapila@postgresql.o 1613 :UBC 0 : nulls[6] = true;
1614 : : else
962 akapila@postgresql.o 1615 :CBC 2 : values[6] = TimestampTzGetDatum(worker.last_recv_time);
3152 peter_e@gmx.net 1616 [ - + ]: 2 : if (XLogRecPtrIsInvalid(worker.reply_lsn))
962 akapila@postgresql.o 1617 :UBC 0 : nulls[7] = true;
1618 : : else
962 akapila@postgresql.o 1619 :CBC 2 : values[7] = LSNGetDatum(worker.reply_lsn);
3152 peter_e@gmx.net 1620 [ - + ]: 2 : if (worker.reply_time == 0)
962 akapila@postgresql.o 1621 :UBC 0 : nulls[8] = true;
1622 : : else
962 akapila@postgresql.o 1623 :CBC 2 : values[8] = TimestampTzGetDatum(worker.reply_time);
1624 : :
712 nathan@postgresql.or 1625 [ + - - - : 2 : switch (worker.type)
- ]
1626 : : {
1627 : 2 : case WORKERTYPE_APPLY:
1628 : 2 : values[9] = CStringGetTextDatum("apply");
1629 : 2 : break;
712 nathan@postgresql.or 1630 :UBC 0 : case WORKERTYPE_PARALLEL_APPLY:
1631 : 0 : values[9] = CStringGetTextDatum("parallel apply");
1632 : 0 : break;
1633 : 0 : case WORKERTYPE_TABLESYNC:
1634 : 0 : values[9] = CStringGetTextDatum("table synchronization");
1635 : 0 : break;
1636 : 0 : case WORKERTYPE_UNKNOWN:
1637 : : /* Should never happen. */
1638 [ # # ]: 0 : elog(ERROR, "unknown worker type");
1639 : : }
1640 : :
1279 michael@paquier.xyz 1641 :CBC 2 : tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
1642 : : values, nulls);
1643 : :
1644 : : /*
1645 : : * If only a single subscription was requested, and we found it,
1646 : : * break.
1647 : : */
3152 peter_e@gmx.net 1648 [ - + ]: 2 : if (OidIsValid(subid))
3152 peter_e@gmx.net 1649 :UBC 0 : break;
1650 : : }
1651 : :
3152 peter_e@gmx.net 1652 :CBC 1 : LWLockRelease(LogicalRepWorkerLock);
1653 : :
1654 : 1 : return (Datum) 0;
1655 : : }
|