Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * origin.c
4 : : * Logical replication progress tracking support.
5 : : *
6 : : * Copyright (c) 2013-2025, PostgreSQL Global Development Group
7 : : *
8 : : * IDENTIFICATION
9 : : * src/backend/replication/logical/origin.c
10 : : *
11 : : * NOTES
12 : : *
13 : : * This file provides the following:
14 : : * * An infrastructure to name nodes in a replication setup
15 : : * * A facility to efficiently store and persist replication progress in an
16 : : * efficient and durable manner.
17 : : *
18 : : * Replication origin consist out of a descriptive, user defined, external
19 : : * name and a short, thus space efficient, internal 2 byte one. This split
20 : : * exists because replication origin have to be stored in WAL and shared
21 : : * memory and long descriptors would be inefficient. For now only use 2 bytes
22 : : * for the internal id of a replication origin as it seems unlikely that there
23 : : * soon will be more than 65k nodes in one replication setup; and using only
24 : : * two bytes allow us to be more space efficient.
25 : : *
26 : : * Replication progress is tracked in a shared memory table
27 : : * (ReplicationState) that's dumped to disk every checkpoint. Entries
28 : : * ('slots') in this table are identified by the internal id. That's the case
29 : : * because it allows to increase replication progress during crash
30 : : * recovery. To allow doing so we store the original LSN (from the originating
31 : : * system) of a transaction in the commit record. That allows to recover the
32 : : * precise replayed state after crash recovery; without requiring synchronous
33 : : * commits. Allowing logical replication to use asynchronous commit is
34 : : * generally good for performance, but especially important as it allows a
35 : : * single threaded replay process to keep up with a source that has multiple
36 : : * backends generating changes concurrently. For efficiency and simplicity
37 : : * reasons a backend can setup one replication origin that's from then used as
38 : : * the source of changes produced by the backend, until reset again.
39 : : *
40 : : * This infrastructure is intended to be used in cooperation with logical
41 : : * decoding. When replaying from a remote system the configured origin is
42 : : * provided to output plugins, allowing prevention of replication loops and
43 : : * other filtering.
44 : : *
45 : : * There are several levels of locking at work:
46 : : *
47 : : * * To create and drop replication origins an exclusive lock on
48 : : * pg_replication_slot is required for the duration. That allows us to
49 : : * safely and conflict free assign new origins using a dirty snapshot.
50 : : *
51 : : * * When creating an in-memory replication progress slot the ReplicationOrigin
52 : : * LWLock has to be held exclusively; when iterating over the replication
53 : : * progress a shared lock has to be held, the same when advancing the
54 : : * replication progress of an individual backend that has not setup as the
55 : : * session's replication origin.
56 : : *
57 : : * * When manipulating or looking at the remote_lsn and local_lsn fields of a
58 : : * replication progress slot that slot's lwlock has to be held. That's
59 : : * primarily because we do not assume 8 byte writes (the LSN) is atomic on
60 : : * all our platforms, but it also simplifies memory ordering concerns
61 : : * between the remote and local lsn. We use a lwlock instead of a spinlock
62 : : * so it's less harmful to hold the lock over a WAL write
63 : : * (cf. AdvanceReplicationProgress).
64 : : *
65 : : * ---------------------------------------------------------------------------
66 : : */
67 : :
68 : : #include "postgres.h"
69 : :
70 : : #include <unistd.h>
71 : : #include <sys/stat.h>
72 : :
73 : : #include "access/genam.h"
74 : : #include "access/htup_details.h"
75 : : #include "access/table.h"
76 : : #include "access/xact.h"
77 : : #include "access/xloginsert.h"
78 : : #include "catalog/catalog.h"
79 : : #include "catalog/indexing.h"
80 : : #include "catalog/pg_subscription.h"
81 : : #include "funcapi.h"
82 : : #include "miscadmin.h"
83 : : #include "nodes/execnodes.h"
84 : : #include "pgstat.h"
85 : : #include "replication/origin.h"
86 : : #include "replication/slot.h"
87 : : #include "storage/condition_variable.h"
88 : : #include "storage/fd.h"
89 : : #include "storage/ipc.h"
90 : : #include "storage/lmgr.h"
91 : : #include "utils/builtins.h"
92 : : #include "utils/fmgroids.h"
93 : : #include "utils/guc.h"
94 : : #include "utils/pg_lsn.h"
95 : : #include "utils/rel.h"
96 : : #include "utils/snapmgr.h"
97 : : #include "utils/syscache.h"
98 : :
99 : : /* paths for replication origin checkpoint files */
100 : : #define PG_REPLORIGIN_CHECKPOINT_FILENAME PG_LOGICAL_DIR "/replorigin_checkpoint"
101 : : #define PG_REPLORIGIN_CHECKPOINT_TMPFILE PG_REPLORIGIN_CHECKPOINT_FILENAME ".tmp"
102 : :
103 : : /* GUC variables */
104 : : int max_active_replication_origins = 10;
105 : :
106 : : /*
107 : : * Replay progress of a single remote node.
108 : : */
109 : : typedef struct ReplicationState
110 : : {
111 : : /*
112 : : * Local identifier for the remote node.
113 : : */
114 : : RepOriginId roident;
115 : :
116 : : /*
117 : : * Location of the latest commit from the remote side.
118 : : */
119 : : XLogRecPtr remote_lsn;
120 : :
121 : : /*
122 : : * Remember the local lsn of the commit record so we can XLogFlush() to it
123 : : * during a checkpoint so we know the commit record actually is safe on
124 : : * disk.
125 : : */
126 : : XLogRecPtr local_lsn;
127 : :
128 : : /*
129 : : * PID of backend that's acquired slot, or 0 if none.
130 : : */
131 : : int acquired_by;
132 : :
133 : : /*
134 : : * Condition variable that's signaled when acquired_by changes.
135 : : */
136 : : ConditionVariable origin_cv;
137 : :
138 : : /*
139 : : * Lock protecting remote_lsn and local_lsn.
140 : : */
141 : : LWLock lock;
142 : : } ReplicationState;
143 : :
144 : : /*
145 : : * On disk version of ReplicationState.
146 : : */
147 : : typedef struct ReplicationStateOnDisk
148 : : {
149 : : RepOriginId roident;
150 : : XLogRecPtr remote_lsn;
151 : : } ReplicationStateOnDisk;
152 : :
153 : :
154 : : typedef struct ReplicationStateCtl
155 : : {
156 : : /* Tranche to use for per-origin LWLocks */
157 : : int tranche_id;
158 : : /* Array of length max_active_replication_origins */
159 : : ReplicationState states[FLEXIBLE_ARRAY_MEMBER];
160 : : } ReplicationStateCtl;
161 : :
162 : : /* external variables */
163 : : RepOriginId replorigin_session_origin = InvalidRepOriginId; /* assumed identity */
164 : : XLogRecPtr replorigin_session_origin_lsn = InvalidXLogRecPtr;
165 : : TimestampTz replorigin_session_origin_timestamp = 0;
166 : :
167 : : /*
168 : : * Base address into a shared memory array of replication states of size
169 : : * max_active_replication_origins.
170 : : */
171 : : static ReplicationState *replication_states;
172 : :
173 : : /*
174 : : * Actual shared memory block (replication_states[] is now part of this).
175 : : */
176 : : static ReplicationStateCtl *replication_states_ctl;
177 : :
178 : : /*
179 : : * We keep a pointer to this backend's ReplicationState to avoid having to
180 : : * search the replication_states array in replorigin_session_advance for each
181 : : * remote commit. (Ownership of a backend's own entry can only be changed by
182 : : * that backend.)
183 : : */
184 : : static ReplicationState *session_replication_state = NULL;
185 : :
186 : : /* Magic for on disk files. */
187 : : #define REPLICATION_STATE_MAGIC ((uint32) 0x1257DADE)
188 : :
189 : : static void
271 msawada@postgresql.o 190 :CBC 58 : replorigin_check_prerequisites(bool check_origins, bool recoveryOK)
191 : : {
192 [ + + - + ]: 58 : if (check_origins && max_active_replication_origins == 0)
3885 andres@anarazel.de 193 [ # # ]:UBC 0 : ereport(ERROR,
194 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
195 : : errmsg("cannot query or manipulate replication origin when \"max_active_replication_origins\" is 0")));
196 : :
3885 andres@anarazel.de 197 [ + + - + ]:CBC 58 : if (!recoveryOK && RecoveryInProgress())
3885 andres@anarazel.de 198 [ # # ]:UBC 0 : ereport(ERROR,
199 : : (errcode(ERRCODE_READ_ONLY_SQL_TRANSACTION),
200 : : errmsg("cannot manipulate replication origins during recovery")));
3885 andres@anarazel.de 201 :CBC 58 : }
202 : :
203 : :
204 : : /*
205 : : * IsReservedOriginName
206 : : * True iff name is either "none" or "any".
207 : : */
208 : : static bool
1245 akapila@postgresql.o 209 : 12 : IsReservedOriginName(const char *name)
210 : : {
211 [ + + + + ]: 23 : return ((pg_strcasecmp(name, LOGICALREP_ORIGIN_NONE) == 0) ||
212 : 11 : (pg_strcasecmp(name, LOGICALREP_ORIGIN_ANY) == 0));
213 : : }
214 : :
215 : : /* ---------------------------------------------------------------------------
216 : : * Functions for working with replication origins themselves.
217 : : * ---------------------------------------------------------------------------
218 : : */
219 : :
220 : : /*
221 : : * Check for a persistent replication origin identified by name.
222 : : *
223 : : * Returns InvalidOid if the node isn't known yet and missing_ok is true.
224 : : */
225 : : RepOriginId
1651 peter@eisentraut.org 226 : 985 : replorigin_by_name(const char *roname, bool missing_ok)
227 : : {
228 : : Form_pg_replication_origin ident;
3861 bruce@momjian.us 229 : 985 : Oid roident = InvalidOid;
230 : : HeapTuple tuple;
231 : : Datum roname_d;
232 : :
3885 andres@anarazel.de 233 : 985 : roname_d = CStringGetTextDatum(roname);
234 : :
235 : 985 : tuple = SearchSysCache1(REPLORIGNAME, roname_d);
236 [ + + ]: 985 : if (HeapTupleIsValid(tuple))
237 : : {
238 : 596 : ident = (Form_pg_replication_origin) GETSTRUCT(tuple);
239 : 596 : roident = ident->roident;
240 : 596 : ReleaseSysCache(tuple);
241 : : }
242 [ + + ]: 389 : else if (!missing_ok)
2995 rhaas@postgresql.org 243 [ + - ]: 4 : ereport(ERROR,
244 : : (errcode(ERRCODE_UNDEFINED_OBJECT),
245 : : errmsg("replication origin \"%s\" does not exist",
246 : : roname)));
247 : :
3885 andres@anarazel.de 248 : 981 : return roident;
249 : : }
250 : :
251 : : /*
252 : : * Create a replication origin.
253 : : *
254 : : * Needs to be called in a transaction.
255 : : */
256 : : RepOriginId
1651 peter@eisentraut.org 257 : 376 : replorigin_create(const char *roname)
258 : : {
259 : : Oid roident;
3861 bruce@momjian.us 260 : 376 : HeapTuple tuple = NULL;
261 : : Relation rel;
262 : : Datum roname_d;
263 : : SnapshotData SnapshotDirty;
264 : : SysScanDesc scan;
265 : : ScanKeyData key;
266 : :
267 : : /*
268 : : * To avoid needing a TOAST table for pg_replication_origin, we limit
269 : : * replication origin names to 512 bytes. This should be more than enough
270 : : * for all practical use.
271 : : */
224 nathan@postgresql.or 272 [ + + ]: 376 : if (strlen(roname) > MAX_RONAME_LEN)
273 [ + - ]: 3 : ereport(ERROR,
274 : : (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
275 : : errmsg("replication origin name is too long"),
276 : : errdetail("Replication origin names must be no longer than %d bytes.",
277 : : MAX_RONAME_LEN)));
278 : :
3885 andres@anarazel.de 279 : 373 : roname_d = CStringGetTextDatum(roname);
280 : :
281 [ - + ]: 373 : Assert(IsTransactionState());
282 : :
283 : : /*
284 : : * We need the numeric replication origin to be 16bit wide, so we cannot
285 : : * rely on the normal oid allocation. Instead we simply scan
286 : : * pg_replication_origin for the first unused id. That's not particularly
287 : : * efficient, but this should be a fairly infrequent operation - we can
288 : : * easily spend a bit more code on this when it turns out it needs to be
289 : : * faster.
290 : : *
291 : : * We handle concurrency by taking an exclusive lock (allowing reads!)
292 : : * over the table for the duration of the search. Because we use a "dirty
293 : : * snapshot" we can read rows that other in-progress sessions have
294 : : * written, even though they would be invisible with normal snapshots. Due
295 : : * to the exclusive lock there's no danger that new rows can appear while
296 : : * we're checking.
297 : : */
298 : 373 : InitDirtySnapshot(SnapshotDirty);
299 : :
2522 300 : 373 : rel = table_open(ReplicationOriginRelationId, ExclusiveLock);
301 : :
302 : : /*
303 : : * We want to be able to access pg_replication_origin without setting up a
304 : : * snapshot. To make that safe, it needs to not have a TOAST table, since
305 : : * TOASTed data cannot be fetched without a snapshot. As of this writing,
306 : : * its only varlena column is roname, which we limit to 512 bytes to avoid
307 : : * needing out-of-line storage. If you add a TOAST table to this catalog,
308 : : * be sure to set up a snapshot everywhere it might be needed. For more
309 : : * information, see https://postgr.es/m/ZvMSUPOqUU-VNADN%40nathan.
310 : : */
224 nathan@postgresql.or 311 [ - + ]: 373 : Assert(!OidIsValid(rel->rd_rel->reltoastrelid));
312 : :
3884 andres@anarazel.de 313 [ + - ]: 680 : for (roident = InvalidOid + 1; roident < PG_UINT16_MAX; roident++)
314 : : {
315 : : bool nulls[Natts_pg_replication_origin];
316 : : Datum values[Natts_pg_replication_origin];
317 : : bool collides;
318 : :
3885 319 [ - + ]: 680 : CHECK_FOR_INTERRUPTS();
320 : :
321 : 680 : ScanKeyInit(&key,
322 : : Anum_pg_replication_origin_roident,
323 : : BTEqualStrategyNumber, F_OIDEQ,
324 : : ObjectIdGetDatum(roident));
325 : :
326 : 680 : scan = systable_beginscan(rel, ReplicationOriginIdentIndex,
327 : : true /* indexOK */ ,
328 : : &SnapshotDirty,
329 : : 1, &key);
330 : :
331 : 680 : collides = HeapTupleIsValid(systable_getnext(scan));
332 : :
333 : 680 : systable_endscan(scan);
334 : :
335 [ + + ]: 680 : if (!collides)
336 : : {
337 : : /*
338 : : * Ok, found an unused roident, insert the new row and do a CCI,
339 : : * so our callers can look it up if they want to.
340 : : */
341 : 373 : memset(&nulls, 0, sizeof(nulls));
342 : :
3861 bruce@momjian.us 343 : 373 : values[Anum_pg_replication_origin_roident - 1] = ObjectIdGetDatum(roident);
3885 andres@anarazel.de 344 : 373 : values[Anum_pg_replication_origin_roname - 1] = roname_d;
345 : :
346 : 373 : tuple = heap_form_tuple(RelationGetDescr(rel), values, nulls);
3242 alvherre@alvh.no-ip. 347 : 373 : CatalogTupleInsert(rel, tuple);
3885 andres@anarazel.de 348 : 372 : CommandCounterIncrement();
349 : 372 : break;
350 : : }
351 : : }
352 : :
353 : : /* now release lock again, */
2522 354 : 372 : table_close(rel, ExclusiveLock);
355 : :
3885 356 [ - + ]: 372 : if (tuple == NULL)
3885 andres@anarazel.de 357 [ # # ]:UBC 0 : ereport(ERROR,
358 : : (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
359 : : errmsg("could not find free replication origin ID")));
360 : :
3885 andres@anarazel.de 361 :CBC 372 : heap_freetuple(tuple);
362 : 372 : return roident;
363 : : }
364 : :
365 : : /*
366 : : * Helper function to drop a replication origin.
367 : : */
368 : : static void
1048 akapila@postgresql.o 369 : 307 : replorigin_state_clear(RepOriginId roident, bool nowait)
370 : : {
371 : : int i;
372 : :
373 : : /*
374 : : * Clean up the slot state info, if there is any matching slot.
375 : : */
3053 alvherre@alvh.no-ip. 376 : 307 : restart:
3885 andres@anarazel.de 377 : 307 : LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
378 : :
271 msawada@postgresql.o 379 [ + + ]: 1013 : for (i = 0; i < max_active_replication_origins; i++)
380 : : {
3885 andres@anarazel.de 381 : 968 : ReplicationState *state = &replication_states[i];
382 : :
383 [ + + ]: 968 : if (state->roident == roident)
384 : : {
385 : : /* found our slot, is it busy? */
386 [ - + ]: 262 : if (state->acquired_by != 0)
387 : : {
388 : : ConditionVariable *cv;
389 : :
3053 alvherre@alvh.no-ip. 390 [ # # ]:UBC 0 : if (nowait)
391 [ # # ]: 0 : ereport(ERROR,
392 : : (errcode(ERRCODE_OBJECT_IN_USE),
393 : : errmsg("could not drop replication origin with ID %d, in use by PID %d",
394 : : state->roident,
395 : : state->acquired_by)));
396 : :
397 : : /*
398 : : * We must wait and then retry. Since we don't know which CV
399 : : * to wait on until here, we can't readily use
400 : : * ConditionVariablePrepareToSleep (calling it here would be
401 : : * wrong, since we could miss the signal if we did so); just
402 : : * use ConditionVariableSleep directly.
403 : : */
404 : 0 : cv = &state->origin_cv;
405 : :
406 : 0 : LWLockRelease(ReplicationOriginLock);
407 : :
408 : 0 : ConditionVariableSleep(cv, WAIT_EVENT_REPLICATION_ORIGIN_DROP);
409 : 0 : goto restart;
410 : : }
411 : :
412 : : /* first make a WAL log entry */
413 : : {
414 : : xl_replorigin_drop xlrec;
415 : :
3885 andres@anarazel.de 416 :CBC 262 : xlrec.node_id = roident;
417 : 262 : XLogBeginInsert();
309 peter@eisentraut.org 418 : 262 : XLogRegisterData(&xlrec, sizeof(xlrec));
3885 andres@anarazel.de 419 : 262 : XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_DROP);
420 : : }
421 : :
422 : : /* then clear the in-memory slot */
423 : 262 : state->roident = InvalidRepOriginId;
424 : 262 : state->remote_lsn = InvalidXLogRecPtr;
425 : 262 : state->local_lsn = InvalidXLogRecPtr;
426 : 262 : break;
427 : : }
428 : : }
429 : 307 : LWLockRelease(ReplicationOriginLock);
2899 tgl@sss.pgh.pa.us 430 : 307 : ConditionVariableCancelSleep();
3885 andres@anarazel.de 431 : 307 : }
432 : :
433 : : /*
434 : : * Drop replication origin (by name).
435 : : *
436 : : * Needs to be called in a transaction.
437 : : */
438 : : void
1651 peter@eisentraut.org 439 : 496 : replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait)
440 : : {
441 : : RepOriginId roident;
442 : : Relation rel;
443 : : HeapTuple tuple;
444 : :
1771 akapila@postgresql.o 445 [ - + ]: 496 : Assert(IsTransactionState());
446 : :
1048 447 : 496 : rel = table_open(ReplicationOriginRelationId, RowExclusiveLock);
448 : :
1771 449 : 496 : roident = replorigin_by_name(name, missing_ok);
450 : :
451 : : /* Lock the origin to prevent concurrent drops. */
1048 452 : 495 : LockSharedObject(ReplicationOriginRelationId, roident, 0,
453 : : AccessExclusiveLock);
454 : :
455 : 495 : tuple = SearchSysCache1(REPLORIGIDENT, ObjectIdGetDatum(roident));
456 [ + + ]: 495 : if (!HeapTupleIsValid(tuple))
457 : : {
458 [ - + ]: 188 : if (!missing_ok)
1048 akapila@postgresql.o 459 [ # # ]:UBC 0 : elog(ERROR, "cache lookup failed for replication origin with ID %d",
460 : : roident);
461 : :
462 : : /*
463 : : * We don't need to retain the locks if the origin is already dropped.
464 : : */
1048 akapila@postgresql.o 465 :CBC 188 : UnlockSharedObject(ReplicationOriginRelationId, roident, 0,
466 : : AccessExclusiveLock);
467 : 188 : table_close(rel, RowExclusiveLock);
468 : 188 : return;
469 : : }
470 : :
471 : 307 : replorigin_state_clear(roident, nowait);
472 : :
473 : : /*
474 : : * Now, we can delete the catalog entry.
475 : : */
476 : 307 : CatalogTupleDelete(rel, &tuple->t_self);
477 : 307 : ReleaseSysCache(tuple);
478 : :
479 : 307 : CommandCounterIncrement();
480 : :
481 : : /* We keep the lock on pg_replication_origin until commit */
1771 482 : 307 : table_close(rel, NoLock);
483 : : }
484 : :
485 : : /*
486 : : * Lookup replication origin via its oid and return the name.
487 : : *
488 : : * The external name is palloc'd in the calling context.
489 : : *
490 : : * Returns true if the origin is known, false otherwise.
491 : : */
492 : : bool
3885 andres@anarazel.de 493 : 27 : replorigin_by_oid(RepOriginId roident, bool missing_ok, char **roname)
494 : : {
495 : : HeapTuple tuple;
496 : : Form_pg_replication_origin ric;
497 : :
498 [ - + ]: 27 : Assert(OidIsValid((Oid) roident));
499 [ - + ]: 27 : Assert(roident != InvalidRepOriginId);
500 [ - + ]: 27 : Assert(roident != DoNotReplicateId);
501 : :
502 : 27 : tuple = SearchSysCache1(REPLORIGIDENT,
503 : : ObjectIdGetDatum((Oid) roident));
504 : :
505 [ + + ]: 27 : if (HeapTupleIsValid(tuple))
506 : : {
507 : 24 : ric = (Form_pg_replication_origin) GETSTRUCT(tuple);
508 : 24 : *roname = text_to_cstring(&ric->roname);
509 : 24 : ReleaseSysCache(tuple);
510 : :
511 : 24 : return true;
512 : : }
513 : : else
514 : : {
515 : 3 : *roname = NULL;
516 : :
517 [ - + ]: 3 : if (!missing_ok)
2995 rhaas@postgresql.org 518 [ # # ]:UBC 0 : ereport(ERROR,
519 : : (errcode(ERRCODE_UNDEFINED_OBJECT),
520 : : errmsg("replication origin with ID %d does not exist",
521 : : roident)));
522 : :
3885 andres@anarazel.de 523 :CBC 3 : return false;
524 : : }
525 : : }
526 : :
527 : :
528 : : /* ---------------------------------------------------------------------------
529 : : * Functions for handling replication progress.
530 : : * ---------------------------------------------------------------------------
531 : : */
532 : :
533 : : Size
534 : 4122 : ReplicationOriginShmemSize(void)
535 : : {
536 : 4122 : Size size = 0;
537 : :
271 msawada@postgresql.o 538 [ + + ]: 4122 : if (max_active_replication_origins == 0)
3885 andres@anarazel.de 539 : 2 : return size;
540 : :
541 : 4120 : size = add_size(size, offsetof(ReplicationStateCtl, states));
542 : :
543 : 4120 : size = add_size(size,
544 : : mul_size(max_active_replication_origins, sizeof(ReplicationState)));
545 : 4120 : return size;
546 : : }
547 : :
548 : : void
549 : 1069 : ReplicationOriginShmemInit(void)
550 : : {
551 : : bool found;
552 : :
271 msawada@postgresql.o 553 [ + + ]: 1069 : if (max_active_replication_origins == 0)
3885 andres@anarazel.de 554 : 1 : return;
555 : :
556 : 1068 : replication_states_ctl = (ReplicationStateCtl *)
557 : 1068 : ShmemInitStruct("ReplicationOriginState",
558 : : ReplicationOriginShmemSize(),
559 : : &found);
3861 bruce@momjian.us 560 : 1068 : replication_states = replication_states_ctl->states;
561 : :
3885 andres@anarazel.de 562 [ + - ]: 1068 : if (!found)
563 : : {
564 : : int i;
565 : :
2042 tgl@sss.pgh.pa.us 566 [ + - + - : 76833 : MemSet(replication_states_ctl, 0, ReplicationOriginShmemSize());
+ - + - +
+ ]
567 : :
568 : 1068 : replication_states_ctl->tranche_id = LWTRANCHE_REPLICATION_ORIGIN_STATE;
569 : :
271 msawada@postgresql.o 570 [ + + ]: 11739 : for (i = 0; i < max_active_replication_origins; i++)
571 : : {
3885 andres@anarazel.de 572 : 10671 : LWLockInitialize(&replication_states[i].lock,
573 : 10671 : replication_states_ctl->tranche_id);
3053 alvherre@alvh.no-ip. 574 : 10671 : ConditionVariableInit(&replication_states[i].origin_cv);
575 : : }
576 : : }
577 : : }
578 : :
579 : : /* ---------------------------------------------------------------------------
580 : : * Perform a checkpoint of each replication origin's progress with respect to
581 : : * the replayed remote_lsn. Make sure that all transactions we refer to in the
582 : : * checkpoint (local_lsn) are actually on-disk. This might not yet be the case
583 : : * if the transactions were originally committed asynchronously.
584 : : *
585 : : * We store checkpoints in the following format:
586 : : * +-------+------------------------+------------------+-----+--------+
587 : : * | MAGIC | ReplicationStateOnDisk | struct Replic... | ... | CRC32C | EOF
588 : : * +-------+------------------------+------------------+-----+--------+
589 : : *
590 : : * So its just the magic, followed by the statically sized
591 : : * ReplicationStateOnDisk structs. Note that the maximum number of
592 : : * ReplicationState is determined by max_active_replication_origins.
593 : : * ---------------------------------------------------------------------------
594 : : */
595 : : void
3885 andres@anarazel.de 596 : 1729 : CheckPointReplicationOrigin(void)
597 : : {
474 michael@paquier.xyz 598 : 1729 : const char *tmppath = PG_REPLORIGIN_CHECKPOINT_TMPFILE;
599 : 1729 : const char *path = PG_REPLORIGIN_CHECKPOINT_FILENAME;
600 : : int tmpfd;
601 : : int i;
3885 andres@anarazel.de 602 : 1729 : uint32 magic = REPLICATION_STATE_MAGIC;
603 : : pg_crc32c crc;
604 : :
271 msawada@postgresql.o 605 [ + + ]: 1729 : if (max_active_replication_origins == 0)
3885 andres@anarazel.de 606 : 1 : return;
607 : :
608 : 1728 : INIT_CRC32C(crc);
609 : :
610 : : /* make sure no old temp file is remaining */
611 [ + - - + ]: 1728 : if (unlink(tmppath) < 0 && errno != ENOENT)
3885 andres@anarazel.de 612 [ # # ]:UBC 0 : ereport(PANIC,
613 : : (errcode_for_file_access(),
614 : : errmsg("could not remove file \"%s\": %m",
615 : : tmppath)));
616 : :
617 : : /*
618 : : * no other backend can perform this at the same time; only one checkpoint
619 : : * can happen at a time.
620 : : */
3007 peter_e@gmx.net 621 :CBC 1728 : tmpfd = OpenTransientFile(tmppath,
622 : : O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
3885 andres@anarazel.de 623 [ - + ]: 1728 : if (tmpfd < 0)
3885 andres@anarazel.de 624 [ # # ]:UBC 0 : ereport(PANIC,
625 : : (errcode_for_file_access(),
626 : : errmsg("could not create file \"%s\": %m",
627 : : tmppath)));
628 : :
629 : : /* write magic */
2691 michael@paquier.xyz 630 :CBC 1728 : errno = 0;
3885 andres@anarazel.de 631 [ - + ]: 1728 : if ((write(tmpfd, &magic, sizeof(magic))) != sizeof(magic))
632 : : {
633 : : /* if write didn't set errno, assume problem is no disk space */
2436 michael@paquier.xyz 634 [ # # ]:UBC 0 : if (errno == 0)
635 : 0 : errno = ENOSPC;
3885 andres@anarazel.de 636 [ # # ]: 0 : ereport(PANIC,
637 : : (errcode_for_file_access(),
638 : : errmsg("could not write to file \"%s\": %m",
639 : : tmppath)));
640 : : }
3885 andres@anarazel.de 641 :CBC 1728 : COMP_CRC32C(crc, &magic, sizeof(magic));
642 : :
643 : : /* prevent concurrent creations/drops */
644 : 1728 : LWLockAcquire(ReplicationOriginLock, LW_SHARED);
645 : :
646 : : /* write actual data */
271 msawada@postgresql.o 647 [ + + ]: 19008 : for (i = 0; i < max_active_replication_origins; i++)
648 : : {
649 : : ReplicationStateOnDisk disk_state;
3885 andres@anarazel.de 650 : 17280 : ReplicationState *curstate = &replication_states[i];
651 : : XLogRecPtr local_lsn;
652 : :
653 [ + + ]: 17280 : if (curstate->roident == InvalidRepOriginId)
654 : 17232 : continue;
655 : :
656 : : /* zero, to avoid uninitialized padding bytes */
3160 657 : 48 : memset(&disk_state, 0, sizeof(disk_state));
658 : :
3885 659 : 48 : LWLockAcquire(&curstate->lock, LW_SHARED);
660 : :
661 : 48 : disk_state.roident = curstate->roident;
662 : :
663 : 48 : disk_state.remote_lsn = curstate->remote_lsn;
664 : 48 : local_lsn = curstate->local_lsn;
665 : :
666 : 48 : LWLockRelease(&curstate->lock);
667 : :
668 : : /* make sure we only write out a commit that's persistent */
669 : 48 : XLogFlush(local_lsn);
670 : :
2691 michael@paquier.xyz 671 : 48 : errno = 0;
3885 andres@anarazel.de 672 [ - + ]: 48 : if ((write(tmpfd, &disk_state, sizeof(disk_state))) !=
673 : : sizeof(disk_state))
674 : : {
675 : : /* if write didn't set errno, assume problem is no disk space */
2436 michael@paquier.xyz 676 [ # # ]:UBC 0 : if (errno == 0)
677 : 0 : errno = ENOSPC;
3885 andres@anarazel.de 678 [ # # ]: 0 : ereport(PANIC,
679 : : (errcode_for_file_access(),
680 : : errmsg("could not write to file \"%s\": %m",
681 : : tmppath)));
682 : : }
683 : :
3885 andres@anarazel.de 684 :CBC 48 : COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
685 : : }
686 : :
687 : 1728 : LWLockRelease(ReplicationOriginLock);
688 : :
689 : : /* write out the CRC */
690 : 1728 : FIN_CRC32C(crc);
2691 michael@paquier.xyz 691 : 1728 : errno = 0;
3885 andres@anarazel.de 692 [ - + ]: 1728 : if ((write(tmpfd, &crc, sizeof(crc))) != sizeof(crc))
693 : : {
694 : : /* if write didn't set errno, assume problem is no disk space */
2436 michael@paquier.xyz 695 [ # # ]:UBC 0 : if (errno == 0)
696 : 0 : errno = ENOSPC;
3885 andres@anarazel.de 697 [ # # ]: 0 : ereport(PANIC,
698 : : (errcode_for_file_access(),
699 : : errmsg("could not write to file \"%s\": %m",
700 : : tmppath)));
701 : : }
702 : :
2356 peter@eisentraut.org 703 [ - + ]:CBC 1728 : if (CloseTransientFile(tmpfd) != 0)
2475 michael@paquier.xyz 704 [ # # ]:UBC 0 : ereport(PANIC,
705 : : (errcode_for_file_access(),
706 : : errmsg("could not close file \"%s\": %m",
707 : : tmppath)));
708 : :
709 : : /* fsync, rename to permanent file, fsync file and directory */
3570 andres@anarazel.de 710 :CBC 1728 : durable_rename(tmppath, path, PANIC);
711 : : }
712 : :
713 : : /*
714 : : * Recover replication replay status from checkpoint data saved earlier by
715 : : * CheckPointReplicationOrigin.
716 : : *
717 : : * This only needs to be called at startup and *not* during every checkpoint
718 : : * read during recovery (e.g. in HS or PITR from a base backup) afterwards. All
719 : : * state thereafter can be recovered by looking at commit records.
720 : : */
721 : : void
3885 722 : 925 : StartupReplicationOrigin(void)
723 : : {
474 michael@paquier.xyz 724 : 925 : const char *path = PG_REPLORIGIN_CHECKPOINT_FILENAME;
725 : : int fd;
726 : : int readBytes;
3861 bruce@momjian.us 727 : 925 : uint32 magic = REPLICATION_STATE_MAGIC;
728 : 925 : int last_state = 0;
729 : : pg_crc32c file_crc;
730 : : pg_crc32c crc;
731 : :
732 : : /* don't want to overwrite already existing state */
733 : : #ifdef USE_ASSERT_CHECKING
734 : : static bool already_started = false;
735 : :
3885 andres@anarazel.de 736 [ - + ]: 925 : Assert(!already_started);
737 : 925 : already_started = true;
738 : : #endif
739 : :
271 msawada@postgresql.o 740 [ + + ]: 925 : if (max_active_replication_origins == 0)
3885 andres@anarazel.de 741 : 52 : return;
742 : :
743 : 924 : INIT_CRC32C(crc);
744 : :
745 [ + + ]: 924 : elog(DEBUG2, "starting up replication origin progress state");
746 : :
3007 peter_e@gmx.net 747 : 924 : fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
748 : :
749 : : /*
750 : : * might have had max_active_replication_origins == 0 last run, or we just
751 : : * brought up a standby.
752 : : */
3885 andres@anarazel.de 753 [ + + + - ]: 924 : if (fd < 0 && errno == ENOENT)
754 : 51 : return;
755 [ - + ]: 873 : else if (fd < 0)
3885 andres@anarazel.de 756 [ # # ]:UBC 0 : ereport(PANIC,
757 : : (errcode_for_file_access(),
758 : : errmsg("could not open file \"%s\": %m",
759 : : path)));
760 : :
761 : : /* verify magic, that is written even if nothing was active */
3885 andres@anarazel.de 762 :CBC 873 : readBytes = read(fd, &magic, sizeof(magic));
763 [ - + ]: 873 : if (readBytes != sizeof(magic))
764 : : {
2709 michael@paquier.xyz 765 [ # # ]:UBC 0 : if (readBytes < 0)
766 [ # # ]: 0 : ereport(PANIC,
767 : : (errcode_for_file_access(),
768 : : errmsg("could not read file \"%s\": %m",
769 : : path)));
770 : : else
771 [ # # ]: 0 : ereport(PANIC,
772 : : (errcode(ERRCODE_DATA_CORRUPTED),
773 : : errmsg("could not read file \"%s\": read %d of %zu",
774 : : path, readBytes, sizeof(magic))));
775 : : }
3885 andres@anarazel.de 776 :CBC 873 : COMP_CRC32C(crc, &magic, sizeof(magic));
777 : :
778 [ - + ]: 873 : if (magic != REPLICATION_STATE_MAGIC)
3885 andres@anarazel.de 779 [ # # ]:UBC 0 : ereport(PANIC,
780 : : (errmsg("replication checkpoint has wrong magic %u instead of %u",
781 : : magic, REPLICATION_STATE_MAGIC)));
782 : :
783 : : /* we can skip locking here, no other access is possible */
784 : :
785 : : /* recover individual states, until there are no more to be found */
786 : : while (true)
3885 andres@anarazel.de 787 :CBC 23 : {
788 : : ReplicationStateOnDisk disk_state;
789 : :
790 : 896 : readBytes = read(fd, &disk_state, sizeof(disk_state));
791 : :
792 [ - + ]: 896 : if (readBytes < 0)
793 : : {
3885 andres@anarazel.de 794 [ # # ]:UBC 0 : ereport(PANIC,
795 : : (errcode_for_file_access(),
796 : : errmsg("could not read file \"%s\": %m",
797 : : path)));
798 : : }
799 : :
800 : : /* no further data */
16 peter@eisentraut.org 801 [ + + ]:GNC 896 : if (readBytes == sizeof(crc))
802 : : {
803 : 873 : memcpy(&file_crc, &disk_state, sizeof(file_crc));
804 : 873 : break;
805 : : }
806 : :
3885 andres@anarazel.de 807 [ - + ]:CBC 23 : if (readBytes != sizeof(disk_state))
808 : : {
3885 andres@anarazel.de 809 [ # # ]:UBC 0 : ereport(PANIC,
810 : : (errcode_for_file_access(),
811 : : errmsg("could not read file \"%s\": read %d of %zu",
812 : : path, readBytes, sizeof(disk_state))));
813 : : }
814 : :
3885 andres@anarazel.de 815 :CBC 23 : COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
816 : :
271 msawada@postgresql.o 817 [ - + ]: 23 : if (last_state == max_active_replication_origins)
3885 andres@anarazel.de 818 [ # # ]:UBC 0 : ereport(PANIC,
819 : : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
820 : : errmsg("could not find free replication state, increase \"max_active_replication_origins\"")));
821 : :
822 : : /* copy data to shared memory */
3885 andres@anarazel.de 823 :CBC 23 : replication_states[last_state].roident = disk_state.roident;
824 : 23 : replication_states[last_state].remote_lsn = disk_state.remote_lsn;
825 : 23 : last_state++;
826 : :
1839 peter@eisentraut.org 827 [ + - ]: 23 : ereport(LOG,
828 : : errmsg("recovered replication state of node %d to %X/%08X",
829 : : disk_state.roident,
830 : : LSN_FORMAT_ARGS(disk_state.remote_lsn)));
831 : : }
832 : :
833 : : /* now check checksum */
3885 andres@anarazel.de 834 : 873 : FIN_CRC32C(crc);
835 [ - + ]: 873 : if (file_crc != crc)
3885 andres@anarazel.de 836 [ # # ]:UBC 0 : ereport(PANIC,
837 : : (errcode(ERRCODE_DATA_CORRUPTED),
838 : : errmsg("replication slot checkpoint has wrong checksum %u, expected %u",
839 : : crc, file_crc)));
840 : :
2356 peter@eisentraut.org 841 [ - + ]:CBC 873 : if (CloseTransientFile(fd) != 0)
2475 michael@paquier.xyz 842 [ # # ]:UBC 0 : ereport(PANIC,
843 : : (errcode_for_file_access(),
844 : : errmsg("could not close file \"%s\": %m",
845 : : path)));
846 : : }
847 : :
848 : : void
3885 andres@anarazel.de 849 :CBC 4 : replorigin_redo(XLogReaderState *record)
850 : : {
851 : 4 : uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
852 : :
853 [ + + - ]: 4 : switch (info)
854 : : {
855 : 2 : case XLOG_REPLORIGIN_SET:
856 : : {
857 : 2 : xl_replorigin_set *xlrec =
943 tgl@sss.pgh.pa.us 858 : 2 : (xl_replorigin_set *) XLogRecGetData(record);
859 : :
3885 andres@anarazel.de 860 : 2 : replorigin_advance(xlrec->node_id,
861 : : xlrec->remote_lsn, record->EndRecPtr,
3861 bruce@momjian.us 862 : 2 : xlrec->force /* backward */ ,
863 : : false /* WAL log */ );
3885 andres@anarazel.de 864 : 2 : break;
865 : : }
866 : 2 : case XLOG_REPLORIGIN_DROP:
867 : : {
868 : : xl_replorigin_drop *xlrec;
869 : : int i;
870 : :
871 : 2 : xlrec = (xl_replorigin_drop *) XLogRecGetData(record);
872 : :
271 msawada@postgresql.o 873 [ + - ]: 3 : for (i = 0; i < max_active_replication_origins; i++)
874 : : {
3885 andres@anarazel.de 875 : 3 : ReplicationState *state = &replication_states[i];
876 : :
877 : : /* found our slot */
878 [ + + ]: 3 : if (state->roident == xlrec->node_id)
879 : : {
880 : : /* reset entry */
881 : 2 : state->roident = InvalidRepOriginId;
882 : 2 : state->remote_lsn = InvalidXLogRecPtr;
883 : 2 : state->local_lsn = InvalidXLogRecPtr;
884 : 2 : break;
885 : : }
886 : : }
887 : 2 : break;
888 : : }
3885 andres@anarazel.de 889 :UBC 0 : default:
890 [ # # ]: 0 : elog(PANIC, "replorigin_redo: unknown op code %u", info);
891 : : }
3885 andres@anarazel.de 892 :CBC 4 : }
893 : :
894 : :
895 : : /*
896 : : * Tell the replication origin progress machinery that a commit from 'node'
897 : : * that originated at the LSN remote_commit on the remote node was replayed
898 : : * successfully and that we don't need to do so again. In combination with
899 : : * setting up replorigin_session_origin_lsn and replorigin_session_origin
900 : : * that ensures we won't lose knowledge about that after a crash if the
901 : : * transaction had a persistent effect (think of asynchronous commits).
902 : : *
903 : : * local_commit needs to be a local LSN of the commit so that we can make sure
904 : : * upon a checkpoint that enough WAL has been persisted to disk.
905 : : *
906 : : * Needs to be called with a RowExclusiveLock on pg_replication_origin,
907 : : * unless running in recovery.
908 : : */
909 : : void
910 : 240 : replorigin_advance(RepOriginId node,
911 : : XLogRecPtr remote_commit, XLogRecPtr local_commit,
912 : : bool go_backward, bool wal_log)
913 : : {
914 : : int i;
915 : 240 : ReplicationState *replication_state = NULL;
916 : 240 : ReplicationState *free_state = NULL;
917 : :
918 [ - + ]: 240 : Assert(node != InvalidRepOriginId);
919 : :
920 : : /* we don't track DoNotReplicateId */
921 [ - + ]: 240 : if (node == DoNotReplicateId)
3885 andres@anarazel.de 922 :UBC 0 : return;
923 : :
924 : : /*
925 : : * XXX: For the case where this is called by WAL replay, it'd be more
926 : : * efficient to restore into a backend local hashtable and only dump into
927 : : * shmem after recovery is finished. Let's wait with implementing that
928 : : * till it's shown to be a measurable expense
929 : : */
930 : :
931 : : /* Lock exclusively, as we may have to create a new table entry. */
3885 andres@anarazel.de 932 :CBC 240 : LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
933 : :
934 : : /*
935 : : * Search for either an existing slot for the origin, or a free one we can
936 : : * use.
937 : : */
271 msawada@postgresql.o 938 [ + + ]: 2202 : for (i = 0; i < max_active_replication_origins; i++)
939 : : {
3885 andres@anarazel.de 940 : 2007 : ReplicationState *curstate = &replication_states[i];
941 : :
942 : : /* remember where to insert if necessary */
943 [ + + + + ]: 2007 : if (curstate->roident == InvalidRepOriginId &&
944 : : free_state == NULL)
945 : : {
946 : 196 : free_state = curstate;
947 : 196 : continue;
948 : : }
949 : :
950 : : /* not our slot */
951 [ + + ]: 1811 : if (curstate->roident != node)
952 : : {
953 : 1766 : continue;
954 : : }
955 : :
956 : : /* ok, found slot */
957 : 45 : replication_state = curstate;
958 : :
959 : 45 : LWLockAcquire(&replication_state->lock, LW_EXCLUSIVE);
960 : :
961 : : /* Make sure it's not used by somebody else */
962 [ - + ]: 45 : if (replication_state->acquired_by != 0)
963 : : {
3885 andres@anarazel.de 964 [ # # ]:UBC 0 : ereport(ERROR,
965 : : (errcode(ERRCODE_OBJECT_IN_USE),
966 : : errmsg("replication origin with ID %d is already active for PID %d",
967 : : replication_state->roident,
968 : : replication_state->acquired_by)));
969 : : }
970 : :
3885 andres@anarazel.de 971 :CBC 45 : break;
972 : : }
973 : :
974 [ + + - + ]: 240 : if (replication_state == NULL && free_state == NULL)
3885 andres@anarazel.de 975 [ # # ]:UBC 0 : ereport(ERROR,
976 : : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
977 : : errmsg("could not find free replication state slot for replication origin with ID %d",
978 : : node),
979 : : errhint("Increase \"max_active_replication_origins\" and try again.")));
980 : :
3885 andres@anarazel.de 981 [ + + ]:CBC 240 : if (replication_state == NULL)
982 : : {
983 : : /* initialize new slot */
984 : 195 : LWLockAcquire(&free_state->lock, LW_EXCLUSIVE);
985 : 195 : replication_state = free_state;
41 alvherre@kurilemu.de 986 [ - + ]:GNC 195 : Assert(!XLogRecPtrIsValid(replication_state->remote_lsn));
987 [ - + ]: 195 : Assert(!XLogRecPtrIsValid(replication_state->local_lsn));
3885 andres@anarazel.de 988 :CBC 195 : replication_state->roident = node;
989 : : }
990 : :
991 [ - + ]: 240 : Assert(replication_state->roident != InvalidRepOriginId);
992 : :
993 : : /*
994 : : * If somebody "forcefully" sets this slot, WAL log it, so it's durable
995 : : * and the standby gets the message. Primarily this will be called during
996 : : * WAL replay (of commit records) where no WAL logging is necessary.
997 : : */
998 [ + + ]: 240 : if (wal_log)
999 : : {
1000 : : xl_replorigin_set xlrec;
1001 : :
1002 : 199 : xlrec.remote_lsn = remote_commit;
1003 : 199 : xlrec.node_id = node;
1004 : 199 : xlrec.force = go_backward;
1005 : :
1006 : 199 : XLogBeginInsert();
309 peter@eisentraut.org 1007 : 199 : XLogRegisterData(&xlrec, sizeof(xlrec));
1008 : :
3885 andres@anarazel.de 1009 : 199 : XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_SET);
1010 : : }
1011 : :
1012 : : /*
1013 : : * Due to - harmless - race conditions during a checkpoint we could see
1014 : : * values here that are older than the ones we already have in memory. We
1015 : : * could also see older values for prepared transactions when the prepare
1016 : : * is sent at a later point of time along with commit prepared and there
1017 : : * are other transactions commits between prepare and commit prepared. See
1018 : : * ReorderBufferFinishPrepared. Don't overwrite those.
1019 : : */
1020 [ + + + + ]: 240 : if (go_backward || replication_state->remote_lsn < remote_commit)
1021 : 233 : replication_state->remote_lsn = remote_commit;
41 alvherre@kurilemu.de 1022 [ + + + + ]:GNC 240 : if (XLogRecPtrIsValid(local_commit) &&
3885 andres@anarazel.de 1023 [ + - ]:CBC 38 : (go_backward || replication_state->local_lsn < local_commit))
1024 : 40 : replication_state->local_lsn = local_commit;
1025 : 240 : LWLockRelease(&replication_state->lock);
1026 : :
1027 : : /*
1028 : : * Release *after* changing the LSNs, slot isn't acquired and thus could
1029 : : * otherwise be dropped anytime.
1030 : : */
1031 : 240 : LWLockRelease(ReplicationOriginLock);
1032 : : }
1033 : :
1034 : :
1035 : : XLogRecPtr
1036 : 8 : replorigin_get_progress(RepOriginId node, bool flush)
1037 : : {
1038 : : int i;
1039 : 8 : XLogRecPtr local_lsn = InvalidXLogRecPtr;
1040 : 8 : XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1041 : :
1042 : : /* prevent slots from being concurrently dropped */
1043 : 8 : LWLockAcquire(ReplicationOriginLock, LW_SHARED);
1044 : :
271 msawada@postgresql.o 1045 [ + + ]: 38 : for (i = 0; i < max_active_replication_origins; i++)
1046 : : {
1047 : : ReplicationState *state;
1048 : :
3885 andres@anarazel.de 1049 : 35 : state = &replication_states[i];
1050 : :
1051 [ + + ]: 35 : if (state->roident == node)
1052 : : {
1053 : 5 : LWLockAcquire(&state->lock, LW_SHARED);
1054 : :
1055 : 5 : remote_lsn = state->remote_lsn;
1056 : 5 : local_lsn = state->local_lsn;
1057 : :
1058 : 5 : LWLockRelease(&state->lock);
1059 : :
1060 : 5 : break;
1061 : : }
1062 : : }
1063 : :
1064 : 8 : LWLockRelease(ReplicationOriginLock);
1065 : :
41 alvherre@kurilemu.de 1066 [ + + + - ]:GNC 8 : if (flush && XLogRecPtrIsValid(local_lsn))
3885 andres@anarazel.de 1067 :CBC 1 : XLogFlush(local_lsn);
1068 : :
1069 : 8 : return remote_lsn;
1070 : : }
1071 : :
1072 : : /*
1073 : : * Tear down a (possibly) configured session replication origin during process
1074 : : * exit.
1075 : : */
1076 : : static void
1077 : 471 : ReplicationOriginExitCleanup(int code, Datum arg)
1078 : : {
3047 tgl@sss.pgh.pa.us 1079 : 471 : ConditionVariable *cv = NULL;
1080 : :
702 alvherre@alvh.no-ip. 1081 [ + + ]: 471 : if (session_replication_state == NULL)
1082 : 189 : return;
1083 : :
3885 andres@anarazel.de 1084 : 282 : LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1085 : :
702 alvherre@alvh.no-ip. 1086 [ + + ]: 282 : if (session_replication_state->acquired_by == MyProcPid)
1087 : : {
3053 1088 : 272 : cv = &session_replication_state->origin_cv;
1089 : :
3885 andres@anarazel.de 1090 : 272 : session_replication_state->acquired_by = 0;
1091 : 272 : session_replication_state = NULL;
1092 : : }
1093 : :
1094 : 282 : LWLockRelease(ReplicationOriginLock);
1095 : :
3053 alvherre@alvh.no-ip. 1096 [ + + ]: 282 : if (cv)
1097 : 272 : ConditionVariableBroadcast(cv);
1098 : : }
1099 : :
1100 : : /*
1101 : : * Setup a replication origin in the shared memory struct if it doesn't
1102 : : * already exist and cache access to the specific ReplicationSlot so the
1103 : : * array doesn't have to be searched when calling
1104 : : * replorigin_session_advance().
1105 : : *
1106 : : * Normally only one such cached origin can exist per process so the cached
1107 : : * value can only be set again after the previous value is torn down with
1108 : : * replorigin_session_reset(). For this normal case pass acquired_by = 0
1109 : : * (meaning the slot is not allowed to be already acquired by another process).
1110 : : *
1111 : : * However, sometimes multiple processes can safely re-use the same origin slot
1112 : : * (for example, multiple parallel apply processes can safely use the same
1113 : : * origin, provided they maintain commit order by allowing only one process to
1114 : : * commit at a time). For this case the first process must pass acquired_by =
1115 : : * 0, and then the other processes sharing that same origin can pass
1116 : : * acquired_by = PID of the first process.
1117 : : */
1118 : : void
1073 akapila@postgresql.o 1119 : 475 : replorigin_session_setup(RepOriginId node, int acquired_by)
1120 : : {
1121 : : static bool registered_cleanup;
1122 : : int i;
3861 bruce@momjian.us 1123 : 475 : int free_slot = -1;
1124 : :
3885 andres@anarazel.de 1125 [ + + ]: 475 : if (!registered_cleanup)
1126 : : {
1127 : 471 : on_shmem_exit(ReplicationOriginExitCleanup, 0);
1128 : 471 : registered_cleanup = true;
1129 : : }
1130 : :
271 msawada@postgresql.o 1131 [ - + ]: 475 : Assert(max_active_replication_origins > 0);
1132 : :
3885 andres@anarazel.de 1133 [ + + ]: 475 : if (session_replication_state != NULL)
1134 [ + - ]: 1 : ereport(ERROR,
1135 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1136 : : errmsg("cannot setup replication origin when one is already setup")));
1137 : :
1138 : : /* Lock exclusively, as we may have to create a new table entry. */
1139 : 474 : LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1140 : :
1141 : : /*
1142 : : * Search for either an existing slot for the origin, or a free one we can
1143 : : * use.
1144 : : */
271 msawada@postgresql.o 1145 [ + + ]: 1939 : for (i = 0; i < max_active_replication_origins; i++)
1146 : : {
3885 andres@anarazel.de 1147 : 1821 : ReplicationState *curstate = &replication_states[i];
1148 : :
1149 : : /* remember where to insert if necessary */
1150 [ + + + + ]: 1821 : if (curstate->roident == InvalidRepOriginId &&
1151 : : free_slot == -1)
1152 : : {
1153 : 120 : free_slot = i;
1154 : 120 : continue;
1155 : : }
1156 : :
1157 : : /* not our slot */
1158 [ + + ]: 1701 : if (curstate->roident != node)
1159 : 1345 : continue;
1160 : :
1073 akapila@postgresql.o 1161 [ + + - + ]: 356 : else if (curstate->acquired_by != 0 && acquired_by == 0)
1162 : : {
3885 andres@anarazel.de 1163 [ # # ]:LBC (1) : ereport(ERROR,
1164 : : (errcode(ERRCODE_OBJECT_IN_USE),
1165 : : errmsg("replication origin with ID %d is already active for PID %d",
1166 : : curstate->roident, curstate->acquired_by)));
1167 : : }
1168 : :
89 akapila@postgresql.o 1169 [ - + ]:GNC 356 : else if (curstate->acquired_by != acquired_by)
1170 : : {
89 akapila@postgresql.o 1171 [ # # ]:UNC 0 : ereport(ERROR,
1172 : : (errcode(ERRCODE_OBJECT_IN_USE),
1173 : : errmsg("could not find replication state slot for replication origin with OID %u which was acquired by %d",
1174 : : node, acquired_by)));
1175 : : }
1176 : :
1177 : : /* ok, found slot */
3885 andres@anarazel.de 1178 :CBC 356 : session_replication_state = curstate;
756 akapila@postgresql.o 1179 : 356 : break;
1180 : : }
1181 : :
1182 : :
3885 andres@anarazel.de 1183 [ + + - + ]: 474 : if (session_replication_state == NULL && free_slot == -1)
3885 andres@anarazel.de 1184 [ # # ]:UBC 0 : ereport(ERROR,
1185 : : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
1186 : : errmsg("could not find free replication state slot for replication origin with ID %d",
1187 : : node),
1188 : : errhint("Increase \"max_active_replication_origins\" and try again.")));
3885 andres@anarazel.de 1189 [ + + ]:CBC 474 : else if (session_replication_state == NULL)
1190 : : {
89 akapila@postgresql.o 1191 [ + + ]:GNC 118 : if (acquired_by)
1192 [ + - ]: 1 : ereport(ERROR,
1193 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1194 : : errmsg("cannot use PID %d for inactive replication origin with ID %d",
1195 : : acquired_by, node)));
1196 : :
1197 : : /* initialize new slot */
3885 andres@anarazel.de 1198 :CBC 117 : session_replication_state = &replication_states[free_slot];
41 alvherre@kurilemu.de 1199 [ - + ]:GNC 117 : Assert(!XLogRecPtrIsValid(session_replication_state->remote_lsn));
1200 [ - + ]: 117 : Assert(!XLogRecPtrIsValid(session_replication_state->local_lsn));
3885 andres@anarazel.de 1201 :CBC 117 : session_replication_state->roident = node;
1202 : : }
1203 : :
1204 : :
1205 [ - + ]: 473 : Assert(session_replication_state->roident != InvalidRepOriginId);
1206 : :
1073 akapila@postgresql.o 1207 [ + + ]: 473 : if (acquired_by == 0)
1208 : 462 : session_replication_state->acquired_by = MyProcPid;
1209 : : else
89 akapila@postgresql.o 1210 [ - + ]:GNC 11 : Assert(session_replication_state->acquired_by == acquired_by);
1211 : :
3885 andres@anarazel.de 1212 :CBC 473 : LWLockRelease(ReplicationOriginLock);
1213 : :
1214 : : /* probably this one is pointless */
3053 alvherre@alvh.no-ip. 1215 : 473 : ConditionVariableBroadcast(&session_replication_state->origin_cv);
3885 andres@anarazel.de 1216 : 473 : }
1217 : :
1218 : : /*
1219 : : * Reset replay state previously setup in this session.
1220 : : *
1221 : : * This function may only be called if an origin was setup with
1222 : : * replorigin_session_setup().
1223 : : */
1224 : : void
1225 : 192 : replorigin_session_reset(void)
1226 : : {
1227 : : ConditionVariable *cv;
1228 : :
271 msawada@postgresql.o 1229 [ - + ]: 192 : Assert(max_active_replication_origins != 0);
1230 : :
3885 andres@anarazel.de 1231 [ + + ]: 192 : if (session_replication_state == NULL)
1232 [ + - ]: 1 : ereport(ERROR,
1233 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1234 : : errmsg("no replication origin is configured")));
1235 : :
1236 : 191 : LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1237 : :
1238 : 191 : session_replication_state->acquired_by = 0;
3053 alvherre@alvh.no-ip. 1239 : 191 : cv = &session_replication_state->origin_cv;
3885 andres@anarazel.de 1240 : 191 : session_replication_state = NULL;
1241 : :
1242 : 191 : LWLockRelease(ReplicationOriginLock);
1243 : :
3053 alvherre@alvh.no-ip. 1244 : 191 : ConditionVariableBroadcast(cv);
3885 andres@anarazel.de 1245 : 191 : }
1246 : :
1247 : : /*
1248 : : * Do the same work replorigin_advance() does, just on the session's
1249 : : * configured origin.
1250 : : *
1251 : : * This is noticeably cheaper than using replorigin_advance().
1252 : : */
1253 : : void
1254 : 1101 : replorigin_session_advance(XLogRecPtr remote_commit, XLogRecPtr local_commit)
1255 : : {
1256 [ - + ]: 1101 : Assert(session_replication_state != NULL);
1257 [ - + ]: 1101 : Assert(session_replication_state->roident != InvalidRepOriginId);
1258 : :
1259 : 1101 : LWLockAcquire(&session_replication_state->lock, LW_EXCLUSIVE);
1260 [ + - ]: 1101 : if (session_replication_state->local_lsn < local_commit)
1261 : 1101 : session_replication_state->local_lsn = local_commit;
1262 [ + + ]: 1101 : if (session_replication_state->remote_lsn < remote_commit)
1263 : 514 : session_replication_state->remote_lsn = remote_commit;
1264 : 1101 : LWLockRelease(&session_replication_state->lock);
1265 : 1101 : }
1266 : :
1267 : : /*
1268 : : * Ask the machinery about the point up to which we successfully replayed
1269 : : * changes from an already setup replication origin.
1270 : : */
1271 : : XLogRecPtr
1272 : 262 : replorigin_session_get_progress(bool flush)
1273 : : {
1274 : : XLogRecPtr remote_lsn;
1275 : : XLogRecPtr local_lsn;
1276 : :
1277 [ - + ]: 262 : Assert(session_replication_state != NULL);
1278 : :
1279 : 262 : LWLockAcquire(&session_replication_state->lock, LW_SHARED);
1280 : 262 : remote_lsn = session_replication_state->remote_lsn;
1281 : 262 : local_lsn = session_replication_state->local_lsn;
1282 : 262 : LWLockRelease(&session_replication_state->lock);
1283 : :
41 alvherre@kurilemu.de 1284 [ + + + - ]:GNC 262 : if (flush && XLogRecPtrIsValid(local_lsn))
3885 andres@anarazel.de 1285 :CBC 1 : XLogFlush(local_lsn);
1286 : :
1287 : 262 : return remote_lsn;
1288 : : }
1289 : :
1290 : :
1291 : :
1292 : : /* ---------------------------------------------------------------------------
1293 : : * SQL functions for working with replication origin.
1294 : : *
1295 : : * These mostly should be fairly short wrappers around more generic functions.
1296 : : * ---------------------------------------------------------------------------
1297 : : */
1298 : :
1299 : : /*
1300 : : * Create replication origin for the passed in name, and return the assigned
1301 : : * oid.
1302 : : */
1303 : : Datum
1304 : 13 : pg_replication_origin_create(PG_FUNCTION_ARGS)
1305 : : {
1306 : : char *name;
1307 : : RepOriginId roident;
1308 : :
1309 : 13 : replorigin_check_prerequisites(false, false);
1310 : :
1311 : 13 : name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1312 : :
1313 : : /*
1314 : : * Replication origins "any and "none" are reserved for system options.
1315 : : * The origins "pg_xxx" are reserved for internal use.
1316 : : */
1245 akapila@postgresql.o 1317 [ + + + + ]: 13 : if (IsReservedName(name) || IsReservedOriginName(name))
2363 tgl@sss.pgh.pa.us 1318 [ + - ]: 3 : ereport(ERROR,
1319 : : (errcode(ERRCODE_RESERVED_NAME),
1320 : : errmsg("replication origin name \"%s\" is reserved",
1321 : : name),
1322 : : errdetail("Origin names \"%s\", \"%s\", and names starting with \"pg_\" are reserved.",
1323 : : LOGICALREP_ORIGIN_ANY, LOGICALREP_ORIGIN_NONE)));
1324 : :
1325 : : /*
1326 : : * If built with appropriate switch, whine when regression-testing
1327 : : * conventions for replication origin names are violated.
1328 : : */
1329 : : #ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
1330 : : if (strncmp(name, "regress_", 8) != 0)
1331 : : elog(WARNING, "replication origins created by regression test cases should have names starting with \"regress_\"");
1332 : : #endif
1333 : :
3885 andres@anarazel.de 1334 : 10 : roident = replorigin_create(name);
1335 : :
1336 : 6 : pfree(name);
1337 : :
1338 : 6 : PG_RETURN_OID(roident);
1339 : : }
1340 : :
1341 : : /*
1342 : : * Drop replication origin.
1343 : : */
1344 : : Datum
1345 : 8 : pg_replication_origin_drop(PG_FUNCTION_ARGS)
1346 : : {
1347 : : char *name;
1348 : :
1349 : 8 : replorigin_check_prerequisites(false, false);
1350 : :
1351 : 8 : name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1352 : :
1771 akapila@postgresql.o 1353 : 8 : replorigin_drop_by_name(name, false, true);
1354 : :
3885 andres@anarazel.de 1355 : 7 : pfree(name);
1356 : :
1357 : 7 : PG_RETURN_VOID();
1358 : : }
1359 : :
1360 : : /*
1361 : : * Return oid of a replication origin.
1362 : : */
1363 : : Datum
3885 andres@anarazel.de 1364 :UBC 0 : pg_replication_origin_oid(PG_FUNCTION_ARGS)
1365 : : {
1366 : : char *name;
1367 : : RepOriginId roident;
1368 : :
1369 : 0 : replorigin_check_prerequisites(false, false);
1370 : :
1371 : 0 : name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1372 : 0 : roident = replorigin_by_name(name, true);
1373 : :
1374 : 0 : pfree(name);
1375 : :
1376 [ # # ]: 0 : if (OidIsValid(roident))
1377 : 0 : PG_RETURN_OID(roident);
1378 : 0 : PG_RETURN_NULL();
1379 : : }
1380 : :
1381 : : /*
1382 : : * Setup a replication origin for this session.
1383 : : */
1384 : : Datum
3885 andres@anarazel.de 1385 :CBC 9 : pg_replication_origin_session_setup(PG_FUNCTION_ARGS)
1386 : : {
1387 : : char *name;
1388 : : RepOriginId origin;
1389 : : int pid;
1390 : :
1391 : 9 : replorigin_check_prerequisites(true, false);
1392 : :
1393 : 9 : name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1394 : 9 : origin = replorigin_by_name(name, false);
89 akapila@postgresql.o 1395 :GNC 8 : pid = PG_GETARG_INT32(1);
1396 : 8 : replorigin_session_setup(origin, pid);
1397 : :
3733 alvherre@alvh.no-ip. 1398 :CBC 6 : replorigin_session_origin = origin;
1399 : :
3885 andres@anarazel.de 1400 : 6 : pfree(name);
1401 : :
1402 : 6 : PG_RETURN_VOID();
1403 : : }
1404 : :
1405 : : /*
1406 : : * Reset previously setup origin in this session
1407 : : */
1408 : : Datum
1409 : 7 : pg_replication_origin_session_reset(PG_FUNCTION_ARGS)
1410 : : {
1411 : 7 : replorigin_check_prerequisites(true, false);
1412 : :
1413 : 7 : replorigin_session_reset();
1414 : :
3733 alvherre@alvh.no-ip. 1415 : 6 : replorigin_session_origin = InvalidRepOriginId;
1416 : 6 : replorigin_session_origin_lsn = InvalidXLogRecPtr;
1417 : 6 : replorigin_session_origin_timestamp = 0;
1418 : :
3885 andres@anarazel.de 1419 : 6 : PG_RETURN_VOID();
1420 : : }
1421 : :
1422 : : /*
1423 : : * Has a replication origin been setup for this session.
1424 : : */
1425 : : Datum
3885 andres@anarazel.de 1426 :GBC 2 : pg_replication_origin_session_is_setup(PG_FUNCTION_ARGS)
1427 : : {
1428 : 2 : replorigin_check_prerequisites(false, false);
1429 : :
3733 alvherre@alvh.no-ip. 1430 : 2 : PG_RETURN_BOOL(replorigin_session_origin != InvalidRepOriginId);
1431 : : }
1432 : :
1433 : :
1434 : : /*
1435 : : * Return the replication progress for origin setup in the current session.
1436 : : *
1437 : : * If 'flush' is set to true it is ensured that the returned value corresponds
1438 : : * to a local transaction that has been flushed. This is useful if asynchronous
1439 : : * commits are used when replaying replicated transactions.
1440 : : */
1441 : : Datum
3885 andres@anarazel.de 1442 :CBC 2 : pg_replication_origin_session_progress(PG_FUNCTION_ARGS)
1443 : : {
1444 : 2 : XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1445 : 2 : bool flush = PG_GETARG_BOOL(0);
1446 : :
1447 : 2 : replorigin_check_prerequisites(true, false);
1448 : :
1449 [ - + ]: 2 : if (session_replication_state == NULL)
3885 andres@anarazel.de 1450 [ # # ]:UBC 0 : ereport(ERROR,
1451 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1452 : : errmsg("no replication origin is configured")));
1453 : :
3885 andres@anarazel.de 1454 :CBC 2 : remote_lsn = replorigin_session_get_progress(flush);
1455 : :
41 alvherre@kurilemu.de 1456 [ - + ]:GNC 2 : if (!XLogRecPtrIsValid(remote_lsn))
3885 andres@anarazel.de 1457 :UBC 0 : PG_RETURN_NULL();
1458 : :
3885 andres@anarazel.de 1459 :CBC 2 : PG_RETURN_LSN(remote_lsn);
1460 : : }
1461 : :
1462 : : Datum
1463 : 1 : pg_replication_origin_xact_setup(PG_FUNCTION_ARGS)
1464 : : {
1465 : 1 : XLogRecPtr location = PG_GETARG_LSN(0);
1466 : :
1467 : 1 : replorigin_check_prerequisites(true, false);
1468 : :
1469 [ - + ]: 1 : if (session_replication_state == NULL)
3885 andres@anarazel.de 1470 [ # # ]:UBC 0 : ereport(ERROR,
1471 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1472 : : errmsg("no replication origin is configured")));
1473 : :
3733 alvherre@alvh.no-ip. 1474 :CBC 1 : replorigin_session_origin_lsn = location;
1475 : 1 : replorigin_session_origin_timestamp = PG_GETARG_TIMESTAMPTZ(1);
1476 : :
3885 andres@anarazel.de 1477 : 1 : PG_RETURN_VOID();
1478 : : }
1479 : :
1480 : : Datum
3885 andres@anarazel.de 1481 :UBC 0 : pg_replication_origin_xact_reset(PG_FUNCTION_ARGS)
1482 : : {
1483 : 0 : replorigin_check_prerequisites(true, false);
1484 : :
3733 alvherre@alvh.no-ip. 1485 : 0 : replorigin_session_origin_lsn = InvalidXLogRecPtr;
1486 : 0 : replorigin_session_origin_timestamp = 0;
1487 : :
3885 andres@anarazel.de 1488 : 0 : PG_RETURN_VOID();
1489 : : }
1490 : :
1491 : :
1492 : : Datum
3885 andres@anarazel.de 1493 :CBC 3 : pg_replication_origin_advance(PG_FUNCTION_ARGS)
1494 : : {
3202 noah@leadboat.com 1495 : 3 : text *name = PG_GETARG_TEXT_PP(0);
3861 bruce@momjian.us 1496 : 3 : XLogRecPtr remote_commit = PG_GETARG_LSN(1);
1497 : : RepOriginId node;
1498 : :
3885 andres@anarazel.de 1499 : 3 : replorigin_check_prerequisites(true, false);
1500 : :
1501 : : /* lock to prevent the replication origin from vanishing */
1502 : 3 : LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1503 : :
1504 : 3 : node = replorigin_by_name(text_to_cstring(name), false);
1505 : :
1506 : : /*
1507 : : * Can't sensibly pass a local commit to be flushed at checkpoint - this
1508 : : * xact hasn't committed yet. This is why this function should be used to
1509 : : * set up the initial replication state, but not for replay.
1510 : : */
1511 : 2 : replorigin_advance(node, remote_commit, InvalidXLogRecPtr,
1512 : : true /* go backward */ , true /* WAL log */ );
1513 : :
1514 : 2 : UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1515 : :
1516 : 2 : PG_RETURN_VOID();
1517 : : }
1518 : :
1519 : :
1520 : : /*
1521 : : * Return the replication progress for an individual replication origin.
1522 : : *
1523 : : * If 'flush' is set to true it is ensured that the returned value corresponds
1524 : : * to a local transaction that has been flushed. This is useful if asynchronous
1525 : : * commits are used when replaying replicated transactions.
1526 : : */
1527 : : Datum
1528 : 3 : pg_replication_origin_progress(PG_FUNCTION_ARGS)
1529 : : {
1530 : : char *name;
1531 : : bool flush;
1532 : : RepOriginId roident;
1533 : 3 : XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1534 : :
1535 : 3 : replorigin_check_prerequisites(true, true);
1536 : :
1537 : 3 : name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1538 : 3 : flush = PG_GETARG_BOOL(1);
1539 : :
1540 : 3 : roident = replorigin_by_name(name, false);
1541 [ - + ]: 2 : Assert(OidIsValid(roident));
1542 : :
1543 : 2 : remote_lsn = replorigin_get_progress(roident, flush);
1544 : :
41 alvherre@kurilemu.de 1545 [ - + ]:GNC 2 : if (!XLogRecPtrIsValid(remote_lsn))
3885 andres@anarazel.de 1546 :UBC 0 : PG_RETURN_NULL();
1547 : :
3885 andres@anarazel.de 1548 :CBC 2 : PG_RETURN_LSN(remote_lsn);
1549 : : }
1550 : :
1551 : :
1552 : : Datum
1553 : 10 : pg_show_replication_origin_status(PG_FUNCTION_ARGS)
1554 : : {
1555 : 10 : ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1556 : : int i;
1557 : : #define REPLICATION_ORIGIN_PROGRESS_COLS 4
1558 : :
1559 : : /* we want to return 0 rows if slot is set to zero */
1560 : 10 : replorigin_check_prerequisites(false, true);
1561 : :
1156 michael@paquier.xyz 1562 : 10 : InitMaterializedSRF(fcinfo, 0);
1563 : :
1564 : : /* prevent slots from being concurrently dropped */
3885 andres@anarazel.de 1565 : 10 : LWLockAcquire(ReplicationOriginLock, LW_SHARED);
1566 : :
1567 : : /*
1568 : : * Iterate through all possible replication_states, display if they are
1569 : : * filled. Note that we do not take any locks, so slightly corrupted/out
1570 : : * of date values are a possibility.
1571 : : */
271 msawada@postgresql.o 1572 [ + + ]: 110 : for (i = 0; i < max_active_replication_origins; i++)
1573 : : {
1574 : : ReplicationState *state;
1575 : : Datum values[REPLICATION_ORIGIN_PROGRESS_COLS];
1576 : : bool nulls[REPLICATION_ORIGIN_PROGRESS_COLS];
1577 : : char *roname;
1578 : :
3885 andres@anarazel.de 1579 : 100 : state = &replication_states[i];
1580 : :
1581 : : /* unused slot, nothing to display */
1582 [ + + ]: 100 : if (state->roident == InvalidRepOriginId)
1583 : 87 : continue;
1584 : :
1585 : 13 : memset(values, 0, sizeof(values));
1586 : 13 : memset(nulls, 1, sizeof(nulls));
1587 : :
1588 : 13 : values[0] = ObjectIdGetDatum(state->roident);
1589 : 13 : nulls[0] = false;
1590 : :
1591 : : /*
1592 : : * We're not preventing the origin to be dropped concurrently, so
1593 : : * silently accept that it might be gone.
1594 : : */
1595 [ + - ]: 13 : if (replorigin_by_oid(state->roident, true,
1596 : : &roname))
1597 : : {
1598 : 13 : values[1] = CStringGetTextDatum(roname);
1599 : 13 : nulls[1] = false;
1600 : : }
1601 : :
1602 : 13 : LWLockAcquire(&state->lock, LW_SHARED);
1603 : :
3861 bruce@momjian.us 1604 : 13 : values[2] = LSNGetDatum(state->remote_lsn);
3885 andres@anarazel.de 1605 : 13 : nulls[2] = false;
1606 : :
1607 : 13 : values[3] = LSNGetDatum(state->local_lsn);
1608 : 13 : nulls[3] = false;
1609 : :
1610 : 13 : LWLockRelease(&state->lock);
1611 : :
1381 michael@paquier.xyz 1612 : 13 : tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
1613 : : values, nulls);
1614 : : }
1615 : :
3885 andres@anarazel.de 1616 : 10 : LWLockRelease(ReplicationOriginLock);
1617 : :
1618 : : #undef REPLICATION_ORIGIN_PROGRESS_COLS
1619 : :
1620 : 10 : return (Datum) 0;
1621 : : }
|