Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * origin.c
4 : : * Logical replication progress tracking support.
5 : : *
6 : : * Copyright (c) 2013-2025, PostgreSQL Global Development Group
7 : : *
8 : : * IDENTIFICATION
9 : : * src/backend/replication/logical/origin.c
10 : : *
11 : : * NOTES
12 : : *
13 : : * This file provides the following:
14 : : * * An infrastructure to name nodes in a replication setup
15 : : * * A facility to efficiently store and persist replication progress in an
16 : : * efficient and durable manner.
17 : : *
18 : : * Replication origin consist out of a descriptive, user defined, external
19 : : * name and a short, thus space efficient, internal 2 byte one. This split
20 : : * exists because replication origin have to be stored in WAL and shared
21 : : * memory and long descriptors would be inefficient. For now only use 2 bytes
22 : : * for the internal id of a replication origin as it seems unlikely that there
23 : : * soon will be more than 65k nodes in one replication setup; and using only
24 : : * two bytes allow us to be more space efficient.
25 : : *
26 : : * Replication progress is tracked in a shared memory table
27 : : * (ReplicationState) that's dumped to disk every checkpoint. Entries
28 : : * ('slots') in this table are identified by the internal id. That's the case
29 : : * because it allows to increase replication progress during crash
30 : : * recovery. To allow doing so we store the original LSN (from the originating
31 : : * system) of a transaction in the commit record. That allows to recover the
32 : : * precise replayed state after crash recovery; without requiring synchronous
33 : : * commits. Allowing logical replication to use asynchronous commit is
34 : : * generally good for performance, but especially important as it allows a
35 : : * single threaded replay process to keep up with a source that has multiple
36 : : * backends generating changes concurrently. For efficiency and simplicity
37 : : * reasons a backend can setup one replication origin that's from then used as
38 : : * the source of changes produced by the backend, until reset again.
39 : : *
40 : : * This infrastructure is intended to be used in cooperation with logical
41 : : * decoding. When replaying from a remote system the configured origin is
42 : : * provided to output plugins, allowing prevention of replication loops and
43 : : * other filtering.
44 : : *
45 : : * There are several levels of locking at work:
46 : : *
47 : : * * To create and drop replication origins an exclusive lock on
48 : : * pg_replication_slot is required for the duration. That allows us to
49 : : * safely and conflict free assign new origins using a dirty snapshot.
50 : : *
51 : : * * When creating an in-memory replication progress slot the ReplicationOrigin
52 : : * LWLock has to be held exclusively; when iterating over the replication
53 : : * progress a shared lock has to be held, the same when advancing the
54 : : * replication progress of an individual backend that has not setup as the
55 : : * session's replication origin.
56 : : *
57 : : * * When manipulating or looking at the remote_lsn and local_lsn fields of a
58 : : * replication progress slot that slot's lwlock has to be held. That's
59 : : * primarily because we do not assume 8 byte writes (the LSN) is atomic on
60 : : * all our platforms, but it also simplifies memory ordering concerns
61 : : * between the remote and local lsn. We use a lwlock instead of a spinlock
62 : : * so it's less harmful to hold the lock over a WAL write
63 : : * (cf. AdvanceReplicationProgress).
64 : : *
65 : : * ---------------------------------------------------------------------------
66 : : */
67 : :
68 : : #include "postgres.h"
69 : :
70 : : #include <unistd.h>
71 : : #include <sys/stat.h>
72 : :
73 : : #include "access/genam.h"
74 : : #include "access/htup_details.h"
75 : : #include "access/table.h"
76 : : #include "access/xact.h"
77 : : #include "access/xloginsert.h"
78 : : #include "catalog/catalog.h"
79 : : #include "catalog/indexing.h"
80 : : #include "catalog/pg_subscription.h"
81 : : #include "funcapi.h"
82 : : #include "miscadmin.h"
83 : : #include "nodes/execnodes.h"
84 : : #include "pgstat.h"
85 : : #include "replication/origin.h"
86 : : #include "replication/slot.h"
87 : : #include "storage/condition_variable.h"
88 : : #include "storage/fd.h"
89 : : #include "storage/ipc.h"
90 : : #include "storage/lmgr.h"
91 : : #include "utils/builtins.h"
92 : : #include "utils/fmgroids.h"
93 : : #include "utils/guc.h"
94 : : #include "utils/pg_lsn.h"
95 : : #include "utils/rel.h"
96 : : #include "utils/snapmgr.h"
97 : : #include "utils/syscache.h"
98 : :
99 : : /* paths for replication origin checkpoint files */
100 : : #define PG_REPLORIGIN_CHECKPOINT_FILENAME PG_LOGICAL_DIR "/replorigin_checkpoint"
101 : : #define PG_REPLORIGIN_CHECKPOINT_TMPFILE PG_REPLORIGIN_CHECKPOINT_FILENAME ".tmp"
102 : :
103 : : /* GUC variables */
104 : : int max_active_replication_origins = 10;
105 : :
106 : : /*
107 : : * Replay progress of a single remote node.
108 : : */
109 : : typedef struct ReplicationState
110 : : {
111 : : /*
112 : : * Local identifier for the remote node.
113 : : */
114 : : RepOriginId roident;
115 : :
116 : : /*
117 : : * Location of the latest commit from the remote side.
118 : : */
119 : : XLogRecPtr remote_lsn;
120 : :
121 : : /*
122 : : * Remember the local lsn of the commit record so we can XLogFlush() to it
123 : : * during a checkpoint so we know the commit record actually is safe on
124 : : * disk.
125 : : */
126 : : XLogRecPtr local_lsn;
127 : :
128 : : /*
129 : : * PID of backend that's acquired slot, or 0 if none.
130 : : */
131 : : int acquired_by;
132 : :
133 : : /*
134 : : * Condition variable that's signaled when acquired_by changes.
135 : : */
136 : : ConditionVariable origin_cv;
137 : :
138 : : /*
139 : : * Lock protecting remote_lsn and local_lsn.
140 : : */
141 : : LWLock lock;
142 : : } ReplicationState;
143 : :
144 : : /*
145 : : * On disk version of ReplicationState.
146 : : */
147 : : typedef struct ReplicationStateOnDisk
148 : : {
149 : : RepOriginId roident;
150 : : XLogRecPtr remote_lsn;
151 : : } ReplicationStateOnDisk;
152 : :
153 : :
154 : : typedef struct ReplicationStateCtl
155 : : {
156 : : /* Tranche to use for per-origin LWLocks */
157 : : int tranche_id;
158 : : /* Array of length max_active_replication_origins */
159 : : ReplicationState states[FLEXIBLE_ARRAY_MEMBER];
160 : : } ReplicationStateCtl;
161 : :
162 : : /* external variables */
163 : : RepOriginId replorigin_session_origin = InvalidRepOriginId; /* assumed identity */
164 : : XLogRecPtr replorigin_session_origin_lsn = InvalidXLogRecPtr;
165 : : TimestampTz replorigin_session_origin_timestamp = 0;
166 : :
167 : : /*
168 : : * Base address into a shared memory array of replication states of size
169 : : * max_active_replication_origins.
170 : : */
171 : : static ReplicationState *replication_states;
172 : :
173 : : /*
174 : : * Actual shared memory block (replication_states[] is now part of this).
175 : : */
176 : : static ReplicationStateCtl *replication_states_ctl;
177 : :
178 : : /*
179 : : * We keep a pointer to this backend's ReplicationState to avoid having to
180 : : * search the replication_states array in replorigin_session_advance for each
181 : : * remote commit. (Ownership of a backend's own entry can only be changed by
182 : : * that backend.)
183 : : */
184 : : static ReplicationState *session_replication_state = NULL;
185 : :
186 : : /* Magic for on disk files. */
187 : : #define REPLICATION_STATE_MAGIC ((uint32) 0x1257DADE)
188 : :
189 : : static void
221 msawada@postgresql.o 190 :CBC 58 : replorigin_check_prerequisites(bool check_origins, bool recoveryOK)
191 : : {
192 [ + + - + ]: 58 : if (check_origins && max_active_replication_origins == 0)
3835 andres@anarazel.de 193 [ # # ]:UBC 0 : ereport(ERROR,
194 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
195 : : errmsg("cannot query or manipulate replication origin when \"max_active_replication_origins\" is 0")));
196 : :
3835 andres@anarazel.de 197 [ + + - + ]:CBC 58 : if (!recoveryOK && RecoveryInProgress())
3835 andres@anarazel.de 198 [ # # ]:UBC 0 : ereport(ERROR,
199 : : (errcode(ERRCODE_READ_ONLY_SQL_TRANSACTION),
200 : : errmsg("cannot manipulate replication origins during recovery")));
3835 andres@anarazel.de 201 :CBC 58 : }
202 : :
203 : :
204 : : /*
205 : : * IsReservedOriginName
206 : : * True iff name is either "none" or "any".
207 : : */
208 : : static bool
1195 akapila@postgresql.o 209 : 12 : IsReservedOriginName(const char *name)
210 : : {
211 [ + + + + ]: 23 : return ((pg_strcasecmp(name, LOGICALREP_ORIGIN_NONE) == 0) ||
212 : 11 : (pg_strcasecmp(name, LOGICALREP_ORIGIN_ANY) == 0));
213 : : }
214 : :
215 : : /* ---------------------------------------------------------------------------
216 : : * Functions for working with replication origins themselves.
217 : : * ---------------------------------------------------------------------------
218 : : */
219 : :
220 : : /*
221 : : * Check for a persistent replication origin identified by name.
222 : : *
223 : : * Returns InvalidOid if the node isn't known yet and missing_ok is true.
224 : : */
225 : : RepOriginId
1601 peter@eisentraut.org 226 : 978 : replorigin_by_name(const char *roname, bool missing_ok)
227 : : {
228 : : Form_pg_replication_origin ident;
3811 bruce@momjian.us 229 : 978 : Oid roident = InvalidOid;
230 : : HeapTuple tuple;
231 : : Datum roname_d;
232 : :
3835 andres@anarazel.de 233 : 978 : roname_d = CStringGetTextDatum(roname);
234 : :
235 : 978 : tuple = SearchSysCache1(REPLORIGNAME, roname_d);
236 [ + + ]: 978 : if (HeapTupleIsValid(tuple))
237 : : {
238 : 592 : ident = (Form_pg_replication_origin) GETSTRUCT(tuple);
239 : 592 : roident = ident->roident;
240 : 592 : ReleaseSysCache(tuple);
241 : : }
242 [ + + ]: 386 : else if (!missing_ok)
2945 rhaas@postgresql.org 243 [ + - ]: 4 : ereport(ERROR,
244 : : (errcode(ERRCODE_UNDEFINED_OBJECT),
245 : : errmsg("replication origin \"%s\" does not exist",
246 : : roname)));
247 : :
3835 andres@anarazel.de 248 : 974 : return roident;
249 : : }
250 : :
251 : : /*
252 : : * Create a replication origin.
253 : : *
254 : : * Needs to be called in a transaction.
255 : : */
256 : : RepOriginId
1601 peter@eisentraut.org 257 : 373 : replorigin_create(const char *roname)
258 : : {
259 : : Oid roident;
3811 bruce@momjian.us 260 : 373 : HeapTuple tuple = NULL;
261 : : Relation rel;
262 : : Datum roname_d;
263 : : SnapshotData SnapshotDirty;
264 : : SysScanDesc scan;
265 : : ScanKeyData key;
266 : :
267 : : /*
268 : : * To avoid needing a TOAST table for pg_replication_origin, we limit
269 : : * replication origin names to 512 bytes. This should be more than enough
270 : : * for all practical use.
271 : : */
174 nathan@postgresql.or 272 [ + + ]: 373 : if (strlen(roname) > MAX_RONAME_LEN)
273 [ + - ]: 3 : ereport(ERROR,
274 : : (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
275 : : errmsg("replication origin name is too long"),
276 : : errdetail("Replication origin names must be no longer than %d bytes.",
277 : : MAX_RONAME_LEN)));
278 : :
3835 andres@anarazel.de 279 : 370 : roname_d = CStringGetTextDatum(roname);
280 : :
281 [ - + ]: 370 : Assert(IsTransactionState());
282 : :
283 : : /*
284 : : * We need the numeric replication origin to be 16bit wide, so we cannot
285 : : * rely on the normal oid allocation. Instead we simply scan
286 : : * pg_replication_origin for the first unused id. That's not particularly
287 : : * efficient, but this should be a fairly infrequent operation - we can
288 : : * easily spend a bit more code on this when it turns out it needs to be
289 : : * faster.
290 : : *
291 : : * We handle concurrency by taking an exclusive lock (allowing reads!)
292 : : * over the table for the duration of the search. Because we use a "dirty
293 : : * snapshot" we can read rows that other in-progress sessions have
294 : : * written, even though they would be invisible with normal snapshots. Due
295 : : * to the exclusive lock there's no danger that new rows can appear while
296 : : * we're checking.
297 : : */
298 : 370 : InitDirtySnapshot(SnapshotDirty);
299 : :
2472 300 : 370 : rel = table_open(ReplicationOriginRelationId, ExclusiveLock);
301 : :
302 : : /*
303 : : * We want to be able to access pg_replication_origin without setting up a
304 : : * snapshot. To make that safe, it needs to not have a TOAST table, since
305 : : * TOASTed data cannot be fetched without a snapshot. As of this writing,
306 : : * its only varlena column is roname, which we limit to 512 bytes to avoid
307 : : * needing out-of-line storage. If you add a TOAST table to this catalog,
308 : : * be sure to set up a snapshot everywhere it might be needed. For more
309 : : * information, see https://postgr.es/m/ZvMSUPOqUU-VNADN%40nathan.
310 : : */
174 nathan@postgresql.or 311 [ - + ]: 370 : Assert(!OidIsValid(rel->rd_rel->reltoastrelid));
312 : :
3834 andres@anarazel.de 313 [ + - ]: 675 : for (roident = InvalidOid + 1; roident < PG_UINT16_MAX; roident++)
314 : : {
315 : : bool nulls[Natts_pg_replication_origin];
316 : : Datum values[Natts_pg_replication_origin];
317 : : bool collides;
318 : :
3835 319 [ - + ]: 675 : CHECK_FOR_INTERRUPTS();
320 : :
321 : 675 : ScanKeyInit(&key,
322 : : Anum_pg_replication_origin_roident,
323 : : BTEqualStrategyNumber, F_OIDEQ,
324 : : ObjectIdGetDatum(roident));
325 : :
326 : 675 : scan = systable_beginscan(rel, ReplicationOriginIdentIndex,
327 : : true /* indexOK */ ,
328 : : &SnapshotDirty,
329 : : 1, &key);
330 : :
331 : 675 : collides = HeapTupleIsValid(systable_getnext(scan));
332 : :
333 : 675 : systable_endscan(scan);
334 : :
335 [ + + ]: 675 : if (!collides)
336 : : {
337 : : /*
338 : : * Ok, found an unused roident, insert the new row and do a CCI,
339 : : * so our callers can look it up if they want to.
340 : : */
341 : 370 : memset(&nulls, 0, sizeof(nulls));
342 : :
3811 bruce@momjian.us 343 : 370 : values[Anum_pg_replication_origin_roident - 1] = ObjectIdGetDatum(roident);
3835 andres@anarazel.de 344 : 370 : values[Anum_pg_replication_origin_roname - 1] = roname_d;
345 : :
346 : 370 : tuple = heap_form_tuple(RelationGetDescr(rel), values, nulls);
3192 alvherre@alvh.no-ip. 347 : 370 : CatalogTupleInsert(rel, tuple);
3835 andres@anarazel.de 348 : 369 : CommandCounterIncrement();
349 : 369 : break;
350 : : }
351 : : }
352 : :
353 : : /* now release lock again, */
2472 354 : 369 : table_close(rel, ExclusiveLock);
355 : :
3835 356 [ - + ]: 369 : if (tuple == NULL)
3835 andres@anarazel.de 357 [ # # ]:UBC 0 : ereport(ERROR,
358 : : (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
359 : : errmsg("could not find free replication origin ID")));
360 : :
3835 andres@anarazel.de 361 :CBC 369 : heap_freetuple(tuple);
362 : 369 : return roident;
363 : : }
364 : :
365 : : /*
366 : : * Helper function to drop a replication origin.
367 : : */
368 : : static void
998 akapila@postgresql.o 369 : 304 : replorigin_state_clear(RepOriginId roident, bool nowait)
370 : : {
371 : : int i;
372 : :
373 : : /*
374 : : * Clean up the slot state info, if there is any matching slot.
375 : : */
3003 alvherre@alvh.no-ip. 376 : 306 : restart:
3835 andres@anarazel.de 377 : 306 : LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
378 : :
221 msawada@postgresql.o 379 [ + + ]: 1016 : for (i = 0; i < max_active_replication_origins; i++)
380 : : {
3835 andres@anarazel.de 381 : 971 : ReplicationState *state = &replication_states[i];
382 : :
383 [ + + ]: 971 : if (state->roident == roident)
384 : : {
385 : : /* found our slot, is it busy? */
386 [ + + ]: 261 : if (state->acquired_by != 0)
387 : : {
388 : : ConditionVariable *cv;
389 : :
3003 alvherre@alvh.no-ip. 390 [ - + ]:GBC 2 : if (nowait)
3003 alvherre@alvh.no-ip. 391 [ # # ]:UBC 0 : ereport(ERROR,
392 : : (errcode(ERRCODE_OBJECT_IN_USE),
393 : : errmsg("could not drop replication origin with ID %d, in use by PID %d",
394 : : state->roident,
395 : : state->acquired_by)));
396 : :
397 : : /*
398 : : * We must wait and then retry. Since we don't know which CV
399 : : * to wait on until here, we can't readily use
400 : : * ConditionVariablePrepareToSleep (calling it here would be
401 : : * wrong, since we could miss the signal if we did so); just
402 : : * use ConditionVariableSleep directly.
403 : : */
3003 alvherre@alvh.no-ip. 404 :GBC 2 : cv = &state->origin_cv;
405 : :
406 : 2 : LWLockRelease(ReplicationOriginLock);
407 : :
408 : 2 : ConditionVariableSleep(cv, WAIT_EVENT_REPLICATION_ORIGIN_DROP);
409 : 2 : goto restart;
410 : : }
411 : :
412 : : /* first make a WAL log entry */
413 : : {
414 : : xl_replorigin_drop xlrec;
415 : :
3835 andres@anarazel.de 416 :CBC 259 : xlrec.node_id = roident;
417 : 259 : XLogBeginInsert();
259 peter@eisentraut.org 418 : 259 : XLogRegisterData(&xlrec, sizeof(xlrec));
3835 andres@anarazel.de 419 : 259 : XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_DROP);
420 : : }
421 : :
422 : : /* then clear the in-memory slot */
423 : 259 : state->roident = InvalidRepOriginId;
424 : 259 : state->remote_lsn = InvalidXLogRecPtr;
425 : 259 : state->local_lsn = InvalidXLogRecPtr;
426 : 259 : break;
427 : : }
428 : : }
429 : 304 : LWLockRelease(ReplicationOriginLock);
2849 tgl@sss.pgh.pa.us 430 : 304 : ConditionVariableCancelSleep();
3835 andres@anarazel.de 431 : 304 : }
432 : :
433 : : /*
434 : : * Drop replication origin (by name).
435 : : *
436 : : * Needs to be called in a transaction.
437 : : */
438 : : void
1601 peter@eisentraut.org 439 : 492 : replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait)
440 : : {
441 : : RepOriginId roident;
442 : : Relation rel;
443 : : HeapTuple tuple;
444 : :
1721 akapila@postgresql.o 445 [ - + ]: 492 : Assert(IsTransactionState());
446 : :
998 447 : 492 : rel = table_open(ReplicationOriginRelationId, RowExclusiveLock);
448 : :
1721 449 : 492 : roident = replorigin_by_name(name, missing_ok);
450 : :
451 : : /* Lock the origin to prevent concurrent drops. */
998 452 : 491 : LockSharedObject(ReplicationOriginRelationId, roident, 0,
453 : : AccessExclusiveLock);
454 : :
455 : 491 : tuple = SearchSysCache1(REPLORIGIDENT, ObjectIdGetDatum(roident));
456 [ + + ]: 491 : if (!HeapTupleIsValid(tuple))
457 : : {
458 [ - + ]: 187 : if (!missing_ok)
998 akapila@postgresql.o 459 [ # # ]:UBC 0 : elog(ERROR, "cache lookup failed for replication origin with ID %d",
460 : : roident);
461 : :
462 : : /*
463 : : * We don't need to retain the locks if the origin is already dropped.
464 : : */
998 akapila@postgresql.o 465 :CBC 187 : UnlockSharedObject(ReplicationOriginRelationId, roident, 0,
466 : : AccessExclusiveLock);
467 : 187 : table_close(rel, RowExclusiveLock);
468 : 187 : return;
469 : : }
470 : :
471 : 304 : replorigin_state_clear(roident, nowait);
472 : :
473 : : /*
474 : : * Now, we can delete the catalog entry.
475 : : */
476 : 304 : CatalogTupleDelete(rel, &tuple->t_self);
477 : 304 : ReleaseSysCache(tuple);
478 : :
479 : 304 : CommandCounterIncrement();
480 : :
481 : : /* We keep the lock on pg_replication_origin until commit */
1721 482 : 304 : table_close(rel, NoLock);
483 : : }
484 : :
485 : : /*
486 : : * Lookup replication origin via its oid and return the name.
487 : : *
488 : : * The external name is palloc'd in the calling context.
489 : : *
490 : : * Returns true if the origin is known, false otherwise.
491 : : */
492 : : bool
3835 andres@anarazel.de 493 : 27 : replorigin_by_oid(RepOriginId roident, bool missing_ok, char **roname)
494 : : {
495 : : HeapTuple tuple;
496 : : Form_pg_replication_origin ric;
497 : :
498 [ - + ]: 27 : Assert(OidIsValid((Oid) roident));
499 [ - + ]: 27 : Assert(roident != InvalidRepOriginId);
500 [ - + ]: 27 : Assert(roident != DoNotReplicateId);
501 : :
502 : 27 : tuple = SearchSysCache1(REPLORIGIDENT,
503 : : ObjectIdGetDatum((Oid) roident));
504 : :
505 [ + + ]: 27 : if (HeapTupleIsValid(tuple))
506 : : {
507 : 24 : ric = (Form_pg_replication_origin) GETSTRUCT(tuple);
508 : 24 : *roname = text_to_cstring(&ric->roname);
509 : 24 : ReleaseSysCache(tuple);
510 : :
511 : 24 : return true;
512 : : }
513 : : else
514 : : {
515 : 3 : *roname = NULL;
516 : :
517 [ - + ]: 3 : if (!missing_ok)
2945 rhaas@postgresql.org 518 [ # # ]:UBC 0 : ereport(ERROR,
519 : : (errcode(ERRCODE_UNDEFINED_OBJECT),
520 : : errmsg("replication origin with ID %d does not exist",
521 : : roident)));
522 : :
3835 andres@anarazel.de 523 :CBC 3 : return false;
524 : : }
525 : : }
526 : :
527 : :
528 : : /* ---------------------------------------------------------------------------
529 : : * Functions for handling replication progress.
530 : : * ---------------------------------------------------------------------------
531 : : */
532 : :
533 : : Size
534 : 4045 : ReplicationOriginShmemSize(void)
535 : : {
536 : 4045 : Size size = 0;
537 : :
221 msawada@postgresql.o 538 [ + + ]: 4045 : if (max_active_replication_origins == 0)
3835 andres@anarazel.de 539 : 2 : return size;
540 : :
541 : 4043 : size = add_size(size, offsetof(ReplicationStateCtl, states));
542 : :
543 : 4043 : size = add_size(size,
544 : : mul_size(max_active_replication_origins, sizeof(ReplicationState)));
545 : 4043 : return size;
546 : : }
547 : :
548 : : void
549 : 1049 : ReplicationOriginShmemInit(void)
550 : : {
551 : : bool found;
552 : :
221 msawada@postgresql.o 553 [ + + ]: 1049 : if (max_active_replication_origins == 0)
3835 andres@anarazel.de 554 : 1 : return;
555 : :
556 : 1048 : replication_states_ctl = (ReplicationStateCtl *)
557 : 1048 : ShmemInitStruct("ReplicationOriginState",
558 : : ReplicationOriginShmemSize(),
559 : : &found);
3811 bruce@momjian.us 560 : 1048 : replication_states = replication_states_ctl->states;
561 : :
3835 andres@anarazel.de 562 [ + - ]: 1048 : if (!found)
563 : : {
564 : : int i;
565 : :
1992 tgl@sss.pgh.pa.us 566 [ + - + - : 75393 : MemSet(replication_states_ctl, 0, ReplicationOriginShmemSize());
+ - + - +
+ ]
567 : :
568 : 1048 : replication_states_ctl->tranche_id = LWTRANCHE_REPLICATION_ORIGIN_STATE;
569 : :
221 msawada@postgresql.o 570 [ + + ]: 11519 : for (i = 0; i < max_active_replication_origins; i++)
571 : : {
3835 andres@anarazel.de 572 : 10471 : LWLockInitialize(&replication_states[i].lock,
573 : 10471 : replication_states_ctl->tranche_id);
3003 alvherre@alvh.no-ip. 574 : 10471 : ConditionVariableInit(&replication_states[i].origin_cv);
575 : : }
576 : : }
577 : : }
578 : :
579 : : /* ---------------------------------------------------------------------------
580 : : * Perform a checkpoint of each replication origin's progress with respect to
581 : : * the replayed remote_lsn. Make sure that all transactions we refer to in the
582 : : * checkpoint (local_lsn) are actually on-disk. This might not yet be the case
583 : : * if the transactions were originally committed asynchronously.
584 : : *
585 : : * We store checkpoints in the following format:
586 : : * +-------+------------------------+------------------+-----+--------+
587 : : * | MAGIC | ReplicationStateOnDisk | struct Replic... | ... | CRC32C | EOF
588 : : * +-------+------------------------+------------------+-----+--------+
589 : : *
590 : : * So its just the magic, followed by the statically sized
591 : : * ReplicationStateOnDisk structs. Note that the maximum number of
592 : : * ReplicationState is determined by max_active_replication_origins.
593 : : * ---------------------------------------------------------------------------
594 : : */
595 : : void
3835 andres@anarazel.de 596 : 1701 : CheckPointReplicationOrigin(void)
597 : : {
424 michael@paquier.xyz 598 : 1701 : const char *tmppath = PG_REPLORIGIN_CHECKPOINT_TMPFILE;
599 : 1701 : const char *path = PG_REPLORIGIN_CHECKPOINT_FILENAME;
600 : : int tmpfd;
601 : : int i;
3835 andres@anarazel.de 602 : 1701 : uint32 magic = REPLICATION_STATE_MAGIC;
603 : : pg_crc32c crc;
604 : :
221 msawada@postgresql.o 605 [ + + ]: 1701 : if (max_active_replication_origins == 0)
3835 andres@anarazel.de 606 : 1 : return;
607 : :
608 : 1700 : INIT_CRC32C(crc);
609 : :
610 : : /* make sure no old temp file is remaining */
611 [ + - - + ]: 1700 : if (unlink(tmppath) < 0 && errno != ENOENT)
3835 andres@anarazel.de 612 [ # # ]:UBC 0 : ereport(PANIC,
613 : : (errcode_for_file_access(),
614 : : errmsg("could not remove file \"%s\": %m",
615 : : tmppath)));
616 : :
617 : : /*
618 : : * no other backend can perform this at the same time; only one checkpoint
619 : : * can happen at a time.
620 : : */
2957 peter_e@gmx.net 621 :CBC 1700 : tmpfd = OpenTransientFile(tmppath,
622 : : O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
3835 andres@anarazel.de 623 [ - + ]: 1700 : if (tmpfd < 0)
3835 andres@anarazel.de 624 [ # # ]:UBC 0 : ereport(PANIC,
625 : : (errcode_for_file_access(),
626 : : errmsg("could not create file \"%s\": %m",
627 : : tmppath)));
628 : :
629 : : /* write magic */
2641 michael@paquier.xyz 630 :CBC 1700 : errno = 0;
3835 andres@anarazel.de 631 [ - + ]: 1700 : if ((write(tmpfd, &magic, sizeof(magic))) != sizeof(magic))
632 : : {
633 : : /* if write didn't set errno, assume problem is no disk space */
2386 michael@paquier.xyz 634 [ # # ]:UBC 0 : if (errno == 0)
635 : 0 : errno = ENOSPC;
3835 andres@anarazel.de 636 [ # # ]: 0 : ereport(PANIC,
637 : : (errcode_for_file_access(),
638 : : errmsg("could not write to file \"%s\": %m",
639 : : tmppath)));
640 : : }
3835 andres@anarazel.de 641 :CBC 1700 : COMP_CRC32C(crc, &magic, sizeof(magic));
642 : :
643 : : /* prevent concurrent creations/drops */
644 : 1700 : LWLockAcquire(ReplicationOriginLock, LW_SHARED);
645 : :
646 : : /* write actual data */
221 msawada@postgresql.o 647 [ + + ]: 18700 : for (i = 0; i < max_active_replication_origins; i++)
648 : : {
649 : : ReplicationStateOnDisk disk_state;
3835 andres@anarazel.de 650 : 17000 : ReplicationState *curstate = &replication_states[i];
651 : : XLogRecPtr local_lsn;
652 : :
653 [ + + ]: 17000 : if (curstate->roident == InvalidRepOriginId)
654 : 16952 : continue;
655 : :
656 : : /* zero, to avoid uninitialized padding bytes */
3110 657 : 48 : memset(&disk_state, 0, sizeof(disk_state));
658 : :
3835 659 : 48 : LWLockAcquire(&curstate->lock, LW_SHARED);
660 : :
661 : 48 : disk_state.roident = curstate->roident;
662 : :
663 : 48 : disk_state.remote_lsn = curstate->remote_lsn;
664 : 48 : local_lsn = curstate->local_lsn;
665 : :
666 : 48 : LWLockRelease(&curstate->lock);
667 : :
668 : : /* make sure we only write out a commit that's persistent */
669 : 48 : XLogFlush(local_lsn);
670 : :
2641 michael@paquier.xyz 671 : 48 : errno = 0;
3835 andres@anarazel.de 672 [ - + ]: 48 : if ((write(tmpfd, &disk_state, sizeof(disk_state))) !=
673 : : sizeof(disk_state))
674 : : {
675 : : /* if write didn't set errno, assume problem is no disk space */
2386 michael@paquier.xyz 676 [ # # ]:UBC 0 : if (errno == 0)
677 : 0 : errno = ENOSPC;
3835 andres@anarazel.de 678 [ # # ]: 0 : ereport(PANIC,
679 : : (errcode_for_file_access(),
680 : : errmsg("could not write to file \"%s\": %m",
681 : : tmppath)));
682 : : }
683 : :
3835 andres@anarazel.de 684 :CBC 48 : COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
685 : : }
686 : :
687 : 1700 : LWLockRelease(ReplicationOriginLock);
688 : :
689 : : /* write out the CRC */
690 : 1700 : FIN_CRC32C(crc);
2641 michael@paquier.xyz 691 : 1700 : errno = 0;
3835 andres@anarazel.de 692 [ - + ]: 1700 : if ((write(tmpfd, &crc, sizeof(crc))) != sizeof(crc))
693 : : {
694 : : /* if write didn't set errno, assume problem is no disk space */
2386 michael@paquier.xyz 695 [ # # ]:UBC 0 : if (errno == 0)
696 : 0 : errno = ENOSPC;
3835 andres@anarazel.de 697 [ # # ]: 0 : ereport(PANIC,
698 : : (errcode_for_file_access(),
699 : : errmsg("could not write to file \"%s\": %m",
700 : : tmppath)));
701 : : }
702 : :
2306 peter@eisentraut.org 703 [ - + ]:CBC 1700 : if (CloseTransientFile(tmpfd) != 0)
2425 michael@paquier.xyz 704 [ # # ]:UBC 0 : ereport(PANIC,
705 : : (errcode_for_file_access(),
706 : : errmsg("could not close file \"%s\": %m",
707 : : tmppath)));
708 : :
709 : : /* fsync, rename to permanent file, fsync file and directory */
3520 andres@anarazel.de 710 :CBC 1700 : durable_rename(tmppath, path, PANIC);
711 : : }
712 : :
713 : : /*
714 : : * Recover replication replay status from checkpoint data saved earlier by
715 : : * CheckPointReplicationOrigin.
716 : : *
717 : : * This only needs to be called at startup and *not* during every checkpoint
718 : : * read during recovery (e.g. in HS or PITR from a base backup) afterwards. All
719 : : * state thereafter can be recovered by looking at commit records.
720 : : */
721 : : void
3835 722 : 907 : StartupReplicationOrigin(void)
723 : : {
424 michael@paquier.xyz 724 : 907 : const char *path = PG_REPLORIGIN_CHECKPOINT_FILENAME;
725 : : int fd;
726 : : int readBytes;
3811 bruce@momjian.us 727 : 907 : uint32 magic = REPLICATION_STATE_MAGIC;
728 : 907 : int last_state = 0;
729 : : pg_crc32c file_crc;
730 : : pg_crc32c crc;
731 : :
732 : : /* don't want to overwrite already existing state */
733 : : #ifdef USE_ASSERT_CHECKING
734 : : static bool already_started = false;
735 : :
3835 andres@anarazel.de 736 [ - + ]: 907 : Assert(!already_started);
737 : 907 : already_started = true;
738 : : #endif
739 : :
221 msawada@postgresql.o 740 [ + + ]: 907 : if (max_active_replication_origins == 0)
3835 andres@anarazel.de 741 : 51 : return;
742 : :
743 : 906 : INIT_CRC32C(crc);
744 : :
745 [ + + ]: 906 : elog(DEBUG2, "starting up replication origin progress state");
746 : :
2957 peter_e@gmx.net 747 : 906 : fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
748 : :
749 : : /*
750 : : * might have had max_active_replication_origins == 0 last run, or we just
751 : : * brought up a standby.
752 : : */
3835 andres@anarazel.de 753 [ + + + - ]: 906 : if (fd < 0 && errno == ENOENT)
754 : 50 : return;
755 [ - + ]: 856 : else if (fd < 0)
3835 andres@anarazel.de 756 [ # # ]:UBC 0 : ereport(PANIC,
757 : : (errcode_for_file_access(),
758 : : errmsg("could not open file \"%s\": %m",
759 : : path)));
760 : :
761 : : /* verify magic, that is written even if nothing was active */
3835 andres@anarazel.de 762 :CBC 856 : readBytes = read(fd, &magic, sizeof(magic));
763 [ - + ]: 856 : if (readBytes != sizeof(magic))
764 : : {
2659 michael@paquier.xyz 765 [ # # ]:UBC 0 : if (readBytes < 0)
766 [ # # ]: 0 : ereport(PANIC,
767 : : (errcode_for_file_access(),
768 : : errmsg("could not read file \"%s\": %m",
769 : : path)));
770 : : else
771 [ # # ]: 0 : ereport(PANIC,
772 : : (errcode(ERRCODE_DATA_CORRUPTED),
773 : : errmsg("could not read file \"%s\": read %d of %zu",
774 : : path, readBytes, sizeof(magic))));
775 : : }
3835 andres@anarazel.de 776 :CBC 856 : COMP_CRC32C(crc, &magic, sizeof(magic));
777 : :
778 [ - + ]: 856 : if (magic != REPLICATION_STATE_MAGIC)
3835 andres@anarazel.de 779 [ # # ]:UBC 0 : ereport(PANIC,
780 : : (errmsg("replication checkpoint has wrong magic %u instead of %u",
781 : : magic, REPLICATION_STATE_MAGIC)));
782 : :
783 : : /* we can skip locking here, no other access is possible */
784 : :
785 : : /* recover individual states, until there are no more to be found */
786 : : while (true)
3835 andres@anarazel.de 787 :CBC 23 : {
788 : : ReplicationStateOnDisk disk_state;
789 : :
790 : 879 : readBytes = read(fd, &disk_state, sizeof(disk_state));
791 : :
792 : : /* no further data */
793 [ + + ]: 879 : if (readBytes == sizeof(crc))
794 : : {
795 : : /* not pretty, but simple ... */
3811 bruce@momjian.us 796 : 856 : file_crc = *(pg_crc32c *) &disk_state;
3835 andres@anarazel.de 797 : 856 : break;
798 : : }
799 : :
800 [ - + ]: 23 : if (readBytes < 0)
801 : : {
3835 andres@anarazel.de 802 [ # # ]:UBC 0 : ereport(PANIC,
803 : : (errcode_for_file_access(),
804 : : errmsg("could not read file \"%s\": %m",
805 : : path)));
806 : : }
807 : :
3835 andres@anarazel.de 808 [ - + ]:CBC 23 : if (readBytes != sizeof(disk_state))
809 : : {
3835 andres@anarazel.de 810 [ # # ]:UBC 0 : ereport(PANIC,
811 : : (errcode_for_file_access(),
812 : : errmsg("could not read file \"%s\": read %d of %zu",
813 : : path, readBytes, sizeof(disk_state))));
814 : : }
815 : :
3835 andres@anarazel.de 816 :CBC 23 : COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
817 : :
221 msawada@postgresql.o 818 [ - + ]: 23 : if (last_state == max_active_replication_origins)
3835 andres@anarazel.de 819 [ # # ]:UBC 0 : ereport(PANIC,
820 : : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
821 : : errmsg("could not find free replication state, increase \"max_active_replication_origins\"")));
822 : :
823 : : /* copy data to shared memory */
3835 andres@anarazel.de 824 :CBC 23 : replication_states[last_state].roident = disk_state.roident;
825 : 23 : replication_states[last_state].remote_lsn = disk_state.remote_lsn;
826 : 23 : last_state++;
827 : :
1789 peter@eisentraut.org 828 [ + - ]: 23 : ereport(LOG,
829 : : errmsg("recovered replication state of node %d to %X/%08X",
830 : : disk_state.roident,
831 : : LSN_FORMAT_ARGS(disk_state.remote_lsn)));
832 : : }
833 : :
834 : : /* now check checksum */
3835 andres@anarazel.de 835 : 856 : FIN_CRC32C(crc);
836 [ - + ]: 856 : if (file_crc != crc)
3835 andres@anarazel.de 837 [ # # ]:UBC 0 : ereport(PANIC,
838 : : (errcode(ERRCODE_DATA_CORRUPTED),
839 : : errmsg("replication slot checkpoint has wrong checksum %u, expected %u",
840 : : crc, file_crc)));
841 : :
2306 peter@eisentraut.org 842 [ - + ]:CBC 856 : if (CloseTransientFile(fd) != 0)
2425 michael@paquier.xyz 843 [ # # ]:UBC 0 : ereport(PANIC,
844 : : (errcode_for_file_access(),
845 : : errmsg("could not close file \"%s\": %m",
846 : : path)));
847 : : }
848 : :
849 : : void
3835 andres@anarazel.de 850 :CBC 4 : replorigin_redo(XLogReaderState *record)
851 : : {
852 : 4 : uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
853 : :
854 [ + + - ]: 4 : switch (info)
855 : : {
856 : 2 : case XLOG_REPLORIGIN_SET:
857 : : {
858 : 2 : xl_replorigin_set *xlrec =
893 tgl@sss.pgh.pa.us 859 : 2 : (xl_replorigin_set *) XLogRecGetData(record);
860 : :
3835 andres@anarazel.de 861 : 2 : replorigin_advance(xlrec->node_id,
862 : : xlrec->remote_lsn, record->EndRecPtr,
3811 bruce@momjian.us 863 : 2 : xlrec->force /* backward */ ,
864 : : false /* WAL log */ );
3835 andres@anarazel.de 865 : 2 : break;
866 : : }
867 : 2 : case XLOG_REPLORIGIN_DROP:
868 : : {
869 : : xl_replorigin_drop *xlrec;
870 : : int i;
871 : :
872 : 2 : xlrec = (xl_replorigin_drop *) XLogRecGetData(record);
873 : :
221 msawada@postgresql.o 874 [ + - ]: 3 : for (i = 0; i < max_active_replication_origins; i++)
875 : : {
3835 andres@anarazel.de 876 : 3 : ReplicationState *state = &replication_states[i];
877 : :
878 : : /* found our slot */
879 [ + + ]: 3 : if (state->roident == xlrec->node_id)
880 : : {
881 : : /* reset entry */
882 : 2 : state->roident = InvalidRepOriginId;
883 : 2 : state->remote_lsn = InvalidXLogRecPtr;
884 : 2 : state->local_lsn = InvalidXLogRecPtr;
885 : 2 : break;
886 : : }
887 : : }
888 : 2 : break;
889 : : }
3835 andres@anarazel.de 890 :UBC 0 : default:
891 [ # # ]: 0 : elog(PANIC, "replorigin_redo: unknown op code %u", info);
892 : : }
3835 andres@anarazel.de 893 :CBC 4 : }
894 : :
895 : :
896 : : /*
897 : : * Tell the replication origin progress machinery that a commit from 'node'
898 : : * that originated at the LSN remote_commit on the remote node was replayed
899 : : * successfully and that we don't need to do so again. In combination with
900 : : * setting up replorigin_session_origin_lsn and replorigin_session_origin
901 : : * that ensures we won't lose knowledge about that after a crash if the
902 : : * transaction had a persistent effect (think of asynchronous commits).
903 : : *
904 : : * local_commit needs to be a local LSN of the commit so that we can make sure
905 : : * upon a checkpoint that enough WAL has been persisted to disk.
906 : : *
907 : : * Needs to be called with a RowExclusiveLock on pg_replication_origin,
908 : : * unless running in recovery.
909 : : */
910 : : void
911 : 238 : replorigin_advance(RepOriginId node,
912 : : XLogRecPtr remote_commit, XLogRecPtr local_commit,
913 : : bool go_backward, bool wal_log)
914 : : {
915 : : int i;
916 : 238 : ReplicationState *replication_state = NULL;
917 : 238 : ReplicationState *free_state = NULL;
918 : :
919 [ - + ]: 238 : Assert(node != InvalidRepOriginId);
920 : :
921 : : /* we don't track DoNotReplicateId */
922 [ - + ]: 238 : if (node == DoNotReplicateId)
3835 andres@anarazel.de 923 :UBC 0 : return;
924 : :
925 : : /*
926 : : * XXX: For the case where this is called by WAL replay, it'd be more
927 : : * efficient to restore into a backend local hashtable and only dump into
928 : : * shmem after recovery is finished. Let's wait with implementing that
929 : : * till it's shown to be a measurable expense
930 : : */
931 : :
932 : : /* Lock exclusively, as we may have to create a new table entry. */
3835 andres@anarazel.de 933 :CBC 238 : LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
934 : :
935 : : /*
936 : : * Search for either an existing slot for the origin, or a free one we can
937 : : * use.
938 : : */
221 msawada@postgresql.o 939 [ + + ]: 2188 : for (i = 0; i < max_active_replication_origins; i++)
940 : : {
3835 andres@anarazel.de 941 : 1994 : ReplicationState *curstate = &replication_states[i];
942 : :
943 : : /* remember where to insert if necessary */
944 [ + + + + ]: 1994 : if (curstate->roident == InvalidRepOriginId &&
945 : : free_state == NULL)
946 : : {
947 : 195 : free_state = curstate;
948 : 195 : continue;
949 : : }
950 : :
951 : : /* not our slot */
952 [ + + ]: 1799 : if (curstate->roident != node)
953 : : {
954 : 1755 : continue;
955 : : }
956 : :
957 : : /* ok, found slot */
958 : 44 : replication_state = curstate;
959 : :
960 : 44 : LWLockAcquire(&replication_state->lock, LW_EXCLUSIVE);
961 : :
962 : : /* Make sure it's not used by somebody else */
963 [ - + ]: 44 : if (replication_state->acquired_by != 0)
964 : : {
3835 andres@anarazel.de 965 [ # # ]:UBC 0 : ereport(ERROR,
966 : : (errcode(ERRCODE_OBJECT_IN_USE),
967 : : errmsg("replication origin with ID %d is already active for PID %d",
968 : : replication_state->roident,
969 : : replication_state->acquired_by)));
970 : : }
971 : :
3835 andres@anarazel.de 972 :CBC 44 : break;
973 : : }
974 : :
975 [ + + - + ]: 238 : if (replication_state == NULL && free_state == NULL)
3835 andres@anarazel.de 976 [ # # ]:UBC 0 : ereport(ERROR,
977 : : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
978 : : errmsg("could not find free replication state slot for replication origin with ID %d",
979 : : node),
980 : : errhint("Increase \"max_active_replication_origins\" and try again.")));
981 : :
3835 andres@anarazel.de 982 [ + + ]:CBC 238 : if (replication_state == NULL)
983 : : {
984 : : /* initialize new slot */
985 : 194 : LWLockAcquire(&free_state->lock, LW_EXCLUSIVE);
986 : 194 : replication_state = free_state;
987 [ - + ]: 194 : Assert(replication_state->remote_lsn == InvalidXLogRecPtr);
988 [ - + ]: 194 : Assert(replication_state->local_lsn == InvalidXLogRecPtr);
989 : 194 : replication_state->roident = node;
990 : : }
991 : :
992 [ - + ]: 238 : Assert(replication_state->roident != InvalidRepOriginId);
993 : :
994 : : /*
995 : : * If somebody "forcefully" sets this slot, WAL log it, so it's durable
996 : : * and the standby gets the message. Primarily this will be called during
997 : : * WAL replay (of commit records) where no WAL logging is necessary.
998 : : */
999 [ + + ]: 238 : if (wal_log)
1000 : : {
1001 : : xl_replorigin_set xlrec;
1002 : :
1003 : 197 : xlrec.remote_lsn = remote_commit;
1004 : 197 : xlrec.node_id = node;
1005 : 197 : xlrec.force = go_backward;
1006 : :
1007 : 197 : XLogBeginInsert();
259 peter@eisentraut.org 1008 : 197 : XLogRegisterData(&xlrec, sizeof(xlrec));
1009 : :
3835 andres@anarazel.de 1010 : 197 : XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_SET);
1011 : : }
1012 : :
1013 : : /*
1014 : : * Due to - harmless - race conditions during a checkpoint we could see
1015 : : * values here that are older than the ones we already have in memory. We
1016 : : * could also see older values for prepared transactions when the prepare
1017 : : * is sent at a later point of time along with commit prepared and there
1018 : : * are other transactions commits between prepare and commit prepared. See
1019 : : * ReorderBufferFinishPrepared. Don't overwrite those.
1020 : : */
1021 [ + + + + ]: 238 : if (go_backward || replication_state->remote_lsn < remote_commit)
1022 : 231 : replication_state->remote_lsn = remote_commit;
1023 [ + + + + ]: 238 : if (local_commit != InvalidXLogRecPtr &&
1024 [ + - ]: 38 : (go_backward || replication_state->local_lsn < local_commit))
1025 : 40 : replication_state->local_lsn = local_commit;
1026 : 238 : LWLockRelease(&replication_state->lock);
1027 : :
1028 : : /*
1029 : : * Release *after* changing the LSNs, slot isn't acquired and thus could
1030 : : * otherwise be dropped anytime.
1031 : : */
1032 : 238 : LWLockRelease(ReplicationOriginLock);
1033 : : }
1034 : :
1035 : :
1036 : : XLogRecPtr
1037 : 8 : replorigin_get_progress(RepOriginId node, bool flush)
1038 : : {
1039 : : int i;
1040 : 8 : XLogRecPtr local_lsn = InvalidXLogRecPtr;
1041 : 8 : XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1042 : :
1043 : : /* prevent slots from being concurrently dropped */
1044 : 8 : LWLockAcquire(ReplicationOriginLock, LW_SHARED);
1045 : :
221 msawada@postgresql.o 1046 [ + + ]: 38 : for (i = 0; i < max_active_replication_origins; i++)
1047 : : {
1048 : : ReplicationState *state;
1049 : :
3835 andres@anarazel.de 1050 : 35 : state = &replication_states[i];
1051 : :
1052 [ + + ]: 35 : if (state->roident == node)
1053 : : {
1054 : 5 : LWLockAcquire(&state->lock, LW_SHARED);
1055 : :
1056 : 5 : remote_lsn = state->remote_lsn;
1057 : 5 : local_lsn = state->local_lsn;
1058 : :
1059 : 5 : LWLockRelease(&state->lock);
1060 : :
1061 : 5 : break;
1062 : : }
1063 : : }
1064 : :
1065 : 8 : LWLockRelease(ReplicationOriginLock);
1066 : :
1067 [ + + + - ]: 8 : if (flush && local_lsn != InvalidXLogRecPtr)
1068 : 1 : XLogFlush(local_lsn);
1069 : :
1070 : 8 : return remote_lsn;
1071 : : }
1072 : :
1073 : : /*
1074 : : * Tear down a (possibly) configured session replication origin during process
1075 : : * exit.
1076 : : */
1077 : : static void
1078 : 468 : ReplicationOriginExitCleanup(int code, Datum arg)
1079 : : {
2997 tgl@sss.pgh.pa.us 1080 : 468 : ConditionVariable *cv = NULL;
1081 : :
652 alvherre@alvh.no-ip. 1082 [ + + ]: 468 : if (session_replication_state == NULL)
1083 : 188 : return;
1084 : :
3835 andres@anarazel.de 1085 : 280 : LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1086 : :
652 alvherre@alvh.no-ip. 1087 [ + + ]: 280 : if (session_replication_state->acquired_by == MyProcPid)
1088 : : {
3003 1089 : 270 : cv = &session_replication_state->origin_cv;
1090 : :
3835 andres@anarazel.de 1091 : 270 : session_replication_state->acquired_by = 0;
1092 : 270 : session_replication_state = NULL;
1093 : : }
1094 : :
1095 : 280 : LWLockRelease(ReplicationOriginLock);
1096 : :
3003 alvherre@alvh.no-ip. 1097 [ + + ]: 280 : if (cv)
1098 : 270 : ConditionVariableBroadcast(cv);
1099 : : }
1100 : :
1101 : : /*
1102 : : * Setup a replication origin in the shared memory struct if it doesn't
1103 : : * already exist and cache access to the specific ReplicationSlot so the
1104 : : * array doesn't have to be searched when calling
1105 : : * replorigin_session_advance().
1106 : : *
1107 : : * Normally only one such cached origin can exist per process so the cached
1108 : : * value can only be set again after the previous value is torn down with
1109 : : * replorigin_session_reset(). For this normal case pass acquired_by = 0
1110 : : * (meaning the slot is not allowed to be already acquired by another process).
1111 : : *
1112 : : * However, sometimes multiple processes can safely re-use the same origin slot
1113 : : * (for example, multiple parallel apply processes can safely use the same
1114 : : * origin, provided they maintain commit order by allowing only one process to
1115 : : * commit at a time). For this case the first process must pass acquired_by =
1116 : : * 0, and then the other processes sharing that same origin can pass
1117 : : * acquired_by = PID of the first process.
1118 : : */
1119 : : void
1023 akapila@postgresql.o 1120 : 472 : replorigin_session_setup(RepOriginId node, int acquired_by)
1121 : : {
1122 : : static bool registered_cleanup;
1123 : : int i;
3811 bruce@momjian.us 1124 : 472 : int free_slot = -1;
1125 : :
3835 andres@anarazel.de 1126 [ + + ]: 472 : if (!registered_cleanup)
1127 : : {
1128 : 468 : on_shmem_exit(ReplicationOriginExitCleanup, 0);
1129 : 468 : registered_cleanup = true;
1130 : : }
1131 : :
221 msawada@postgresql.o 1132 [ - + ]: 472 : Assert(max_active_replication_origins > 0);
1133 : :
3835 andres@anarazel.de 1134 [ + + ]: 472 : if (session_replication_state != NULL)
1135 [ + - ]: 1 : ereport(ERROR,
1136 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1137 : : errmsg("cannot setup replication origin when one is already setup")));
1138 : :
1139 : : /* Lock exclusively, as we may have to create a new table entry. */
1140 : 471 : LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1141 : :
1142 : : /*
1143 : : * Search for either an existing slot for the origin, or a free one we can
1144 : : * use.
1145 : : */
221 msawada@postgresql.o 1146 [ + + ]: 1925 : for (i = 0; i < max_active_replication_origins; i++)
1147 : : {
3835 andres@anarazel.de 1148 : 1808 : ReplicationState *curstate = &replication_states[i];
1149 : :
1150 : : /* remember where to insert if necessary */
1151 [ + + + + ]: 1808 : if (curstate->roident == InvalidRepOriginId &&
1152 : : free_slot == -1)
1153 : : {
1154 : 119 : free_slot = i;
1155 : 119 : continue;
1156 : : }
1157 : :
1158 : : /* not our slot */
1159 [ + + ]: 1689 : if (curstate->roident != node)
1160 : 1335 : continue;
1161 : :
1023 akapila@postgresql.o 1162 [ + + - + ]: 354 : else if (curstate->acquired_by != 0 && acquired_by == 0)
1163 : : {
3835 andres@anarazel.de 1164 [ # # ]:UBC 0 : ereport(ERROR,
1165 : : (errcode(ERRCODE_OBJECT_IN_USE),
1166 : : errmsg("replication origin with ID %d is already active for PID %d",
1167 : : curstate->roident, curstate->acquired_by)));
1168 : : }
1169 : :
39 akapila@postgresql.o 1170 [ - + ]:GNC 354 : else if (curstate->acquired_by != acquired_by)
1171 : : {
39 akapila@postgresql.o 1172 [ # # ]:UNC 0 : ereport(ERROR,
1173 : : (errcode(ERRCODE_OBJECT_IN_USE),
1174 : : errmsg("could not find replication state slot for replication origin with OID %u which was acquired by %d",
1175 : : node, acquired_by)));
1176 : : }
1177 : :
1178 : : /* ok, found slot */
3835 andres@anarazel.de 1179 :CBC 354 : session_replication_state = curstate;
706 akapila@postgresql.o 1180 : 354 : break;
1181 : : }
1182 : :
1183 : :
3835 andres@anarazel.de 1184 [ + + - + ]: 471 : if (session_replication_state == NULL && free_slot == -1)
3835 andres@anarazel.de 1185 [ # # ]:UBC 0 : ereport(ERROR,
1186 : : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
1187 : : errmsg("could not find free replication state slot for replication origin with ID %d",
1188 : : node),
1189 : : errhint("Increase \"max_active_replication_origins\" and try again.")));
3835 andres@anarazel.de 1190 [ + + ]:CBC 471 : else if (session_replication_state == NULL)
1191 : : {
39 akapila@postgresql.o 1192 [ + + ]:GNC 117 : if (acquired_by)
1193 [ + - ]: 1 : ereport(ERROR,
1194 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1195 : : errmsg("cannot use PID %d for inactive replication origin with ID %d",
1196 : : acquired_by, node)));
1197 : :
1198 : : /* initialize new slot */
3835 andres@anarazel.de 1199 :CBC 116 : session_replication_state = &replication_states[free_slot];
1200 [ - + ]: 116 : Assert(session_replication_state->remote_lsn == InvalidXLogRecPtr);
1201 [ - + ]: 116 : Assert(session_replication_state->local_lsn == InvalidXLogRecPtr);
1202 : 116 : session_replication_state->roident = node;
1203 : : }
1204 : :
1205 : :
1206 [ - + ]: 470 : Assert(session_replication_state->roident != InvalidRepOriginId);
1207 : :
1023 akapila@postgresql.o 1208 [ + + ]: 470 : if (acquired_by == 0)
1209 : 459 : session_replication_state->acquired_by = MyProcPid;
1210 : : else
39 akapila@postgresql.o 1211 [ - + ]:GNC 11 : Assert(session_replication_state->acquired_by == acquired_by);
1212 : :
3835 andres@anarazel.de 1213 :CBC 470 : LWLockRelease(ReplicationOriginLock);
1214 : :
1215 : : /* probably this one is pointless */
3003 alvherre@alvh.no-ip. 1216 : 470 : ConditionVariableBroadcast(&session_replication_state->origin_cv);
3835 andres@anarazel.de 1217 : 470 : }
1218 : :
1219 : : /*
1220 : : * Reset replay state previously setup in this session.
1221 : : *
1222 : : * This function may only be called if an origin was setup with
1223 : : * replorigin_session_setup().
1224 : : */
1225 : : void
1226 : 191 : replorigin_session_reset(void)
1227 : : {
1228 : : ConditionVariable *cv;
1229 : :
221 msawada@postgresql.o 1230 [ - + ]: 191 : Assert(max_active_replication_origins != 0);
1231 : :
3835 andres@anarazel.de 1232 [ + + ]: 191 : if (session_replication_state == NULL)
1233 [ + - ]: 1 : ereport(ERROR,
1234 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1235 : : errmsg("no replication origin is configured")));
1236 : :
1237 : 190 : LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1238 : :
1239 : 190 : session_replication_state->acquired_by = 0;
3003 alvherre@alvh.no-ip. 1240 : 190 : cv = &session_replication_state->origin_cv;
3835 andres@anarazel.de 1241 : 190 : session_replication_state = NULL;
1242 : :
1243 : 190 : LWLockRelease(ReplicationOriginLock);
1244 : :
3003 alvherre@alvh.no-ip. 1245 : 190 : ConditionVariableBroadcast(cv);
3835 andres@anarazel.de 1246 : 190 : }
1247 : :
1248 : : /*
1249 : : * Do the same work replorigin_advance() does, just on the session's
1250 : : * configured origin.
1251 : : *
1252 : : * This is noticeably cheaper than using replorigin_advance().
1253 : : */
1254 : : void
1255 : 1093 : replorigin_session_advance(XLogRecPtr remote_commit, XLogRecPtr local_commit)
1256 : : {
1257 [ - + ]: 1093 : Assert(session_replication_state != NULL);
1258 [ - + ]: 1093 : Assert(session_replication_state->roident != InvalidRepOriginId);
1259 : :
1260 : 1093 : LWLockAcquire(&session_replication_state->lock, LW_EXCLUSIVE);
1261 [ + - ]: 1093 : if (session_replication_state->local_lsn < local_commit)
1262 : 1093 : session_replication_state->local_lsn = local_commit;
1263 [ + + ]: 1093 : if (session_replication_state->remote_lsn < remote_commit)
1264 : 509 : session_replication_state->remote_lsn = remote_commit;
1265 : 1093 : LWLockRelease(&session_replication_state->lock);
1266 : 1093 : }
1267 : :
1268 : : /*
1269 : : * Ask the machinery about the point up to which we successfully replayed
1270 : : * changes from an already setup replication origin.
1271 : : */
1272 : : XLogRecPtr
1273 : 261 : replorigin_session_get_progress(bool flush)
1274 : : {
1275 : : XLogRecPtr remote_lsn;
1276 : : XLogRecPtr local_lsn;
1277 : :
1278 [ - + ]: 261 : Assert(session_replication_state != NULL);
1279 : :
1280 : 261 : LWLockAcquire(&session_replication_state->lock, LW_SHARED);
1281 : 261 : remote_lsn = session_replication_state->remote_lsn;
1282 : 261 : local_lsn = session_replication_state->local_lsn;
1283 : 261 : LWLockRelease(&session_replication_state->lock);
1284 : :
1285 [ + + + - ]: 261 : if (flush && local_lsn != InvalidXLogRecPtr)
1286 : 1 : XLogFlush(local_lsn);
1287 : :
1288 : 261 : return remote_lsn;
1289 : : }
1290 : :
1291 : :
1292 : :
1293 : : /* ---------------------------------------------------------------------------
1294 : : * SQL functions for working with replication origin.
1295 : : *
1296 : : * These mostly should be fairly short wrappers around more generic functions.
1297 : : * ---------------------------------------------------------------------------
1298 : : */
1299 : :
1300 : : /*
1301 : : * Create replication origin for the passed in name, and return the assigned
1302 : : * oid.
1303 : : */
1304 : : Datum
1305 : 13 : pg_replication_origin_create(PG_FUNCTION_ARGS)
1306 : : {
1307 : : char *name;
1308 : : RepOriginId roident;
1309 : :
1310 : 13 : replorigin_check_prerequisites(false, false);
1311 : :
1312 : 13 : name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1313 : :
1314 : : /*
1315 : : * Replication origins "any and "none" are reserved for system options.
1316 : : * The origins "pg_xxx" are reserved for internal use.
1317 : : */
1195 akapila@postgresql.o 1318 [ + + + + ]: 13 : if (IsReservedName(name) || IsReservedOriginName(name))
2313 tgl@sss.pgh.pa.us 1319 [ + - ]: 3 : ereport(ERROR,
1320 : : (errcode(ERRCODE_RESERVED_NAME),
1321 : : errmsg("replication origin name \"%s\" is reserved",
1322 : : name),
1323 : : errdetail("Origin names \"%s\", \"%s\", and names starting with \"pg_\" are reserved.",
1324 : : LOGICALREP_ORIGIN_ANY, LOGICALREP_ORIGIN_NONE)));
1325 : :
1326 : : /*
1327 : : * If built with appropriate switch, whine when regression-testing
1328 : : * conventions for replication origin names are violated.
1329 : : */
1330 : : #ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
1331 : : if (strncmp(name, "regress_", 8) != 0)
1332 : : elog(WARNING, "replication origins created by regression test cases should have names starting with \"regress_\"");
1333 : : #endif
1334 : :
3835 andres@anarazel.de 1335 : 10 : roident = replorigin_create(name);
1336 : :
1337 : 6 : pfree(name);
1338 : :
1339 : 6 : PG_RETURN_OID(roident);
1340 : : }
1341 : :
1342 : : /*
1343 : : * Drop replication origin.
1344 : : */
1345 : : Datum
1346 : 8 : pg_replication_origin_drop(PG_FUNCTION_ARGS)
1347 : : {
1348 : : char *name;
1349 : :
1350 : 8 : replorigin_check_prerequisites(false, false);
1351 : :
1352 : 8 : name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1353 : :
1721 akapila@postgresql.o 1354 : 8 : replorigin_drop_by_name(name, false, true);
1355 : :
3835 andres@anarazel.de 1356 : 7 : pfree(name);
1357 : :
1358 : 7 : PG_RETURN_VOID();
1359 : : }
1360 : :
1361 : : /*
1362 : : * Return oid of a replication origin.
1363 : : */
1364 : : Datum
3835 andres@anarazel.de 1365 :UBC 0 : pg_replication_origin_oid(PG_FUNCTION_ARGS)
1366 : : {
1367 : : char *name;
1368 : : RepOriginId roident;
1369 : :
1370 : 0 : replorigin_check_prerequisites(false, false);
1371 : :
1372 : 0 : name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1373 : 0 : roident = replorigin_by_name(name, true);
1374 : :
1375 : 0 : pfree(name);
1376 : :
1377 [ # # ]: 0 : if (OidIsValid(roident))
1378 : 0 : PG_RETURN_OID(roident);
1379 : 0 : PG_RETURN_NULL();
1380 : : }
1381 : :
1382 : : /*
1383 : : * Setup a replication origin for this session.
1384 : : */
1385 : : Datum
3835 andres@anarazel.de 1386 :CBC 9 : pg_replication_origin_session_setup(PG_FUNCTION_ARGS)
1387 : : {
1388 : : char *name;
1389 : : RepOriginId origin;
1390 : : int pid;
1391 : :
1392 : 9 : replorigin_check_prerequisites(true, false);
1393 : :
1394 : 9 : name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1395 : 9 : origin = replorigin_by_name(name, false);
39 akapila@postgresql.o 1396 :GNC 8 : pid = PG_GETARG_INT32(1);
1397 : 8 : replorigin_session_setup(origin, pid);
1398 : :
3683 alvherre@alvh.no-ip. 1399 :CBC 6 : replorigin_session_origin = origin;
1400 : :
3835 andres@anarazel.de 1401 : 6 : pfree(name);
1402 : :
1403 : 6 : PG_RETURN_VOID();
1404 : : }
1405 : :
1406 : : /*
1407 : : * Reset previously setup origin in this session
1408 : : */
1409 : : Datum
1410 : 7 : pg_replication_origin_session_reset(PG_FUNCTION_ARGS)
1411 : : {
1412 : 7 : replorigin_check_prerequisites(true, false);
1413 : :
1414 : 7 : replorigin_session_reset();
1415 : :
3683 alvherre@alvh.no-ip. 1416 : 6 : replorigin_session_origin = InvalidRepOriginId;
1417 : 6 : replorigin_session_origin_lsn = InvalidXLogRecPtr;
1418 : 6 : replorigin_session_origin_timestamp = 0;
1419 : :
3835 andres@anarazel.de 1420 : 6 : PG_RETURN_VOID();
1421 : : }
1422 : :
1423 : : /*
1424 : : * Has a replication origin been setup for this session.
1425 : : */
1426 : : Datum
3835 andres@anarazel.de 1427 :GBC 2 : pg_replication_origin_session_is_setup(PG_FUNCTION_ARGS)
1428 : : {
1429 : 2 : replorigin_check_prerequisites(false, false);
1430 : :
3683 alvherre@alvh.no-ip. 1431 : 2 : PG_RETURN_BOOL(replorigin_session_origin != InvalidRepOriginId);
1432 : : }
1433 : :
1434 : :
1435 : : /*
1436 : : * Return the replication progress for origin setup in the current session.
1437 : : *
1438 : : * If 'flush' is set to true it is ensured that the returned value corresponds
1439 : : * to a local transaction that has been flushed. This is useful if asynchronous
1440 : : * commits are used when replaying replicated transactions.
1441 : : */
1442 : : Datum
3835 andres@anarazel.de 1443 :CBC 2 : pg_replication_origin_session_progress(PG_FUNCTION_ARGS)
1444 : : {
1445 : 2 : XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1446 : 2 : bool flush = PG_GETARG_BOOL(0);
1447 : :
1448 : 2 : replorigin_check_prerequisites(true, false);
1449 : :
1450 [ - + ]: 2 : if (session_replication_state == NULL)
3835 andres@anarazel.de 1451 [ # # ]:UBC 0 : ereport(ERROR,
1452 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1453 : : errmsg("no replication origin is configured")));
1454 : :
3835 andres@anarazel.de 1455 :CBC 2 : remote_lsn = replorigin_session_get_progress(flush);
1456 : :
1457 [ - + ]: 2 : if (remote_lsn == InvalidXLogRecPtr)
3835 andres@anarazel.de 1458 :UBC 0 : PG_RETURN_NULL();
1459 : :
3835 andres@anarazel.de 1460 :CBC 2 : PG_RETURN_LSN(remote_lsn);
1461 : : }
1462 : :
1463 : : Datum
1464 : 1 : pg_replication_origin_xact_setup(PG_FUNCTION_ARGS)
1465 : : {
1466 : 1 : XLogRecPtr location = PG_GETARG_LSN(0);
1467 : :
1468 : 1 : replorigin_check_prerequisites(true, false);
1469 : :
1470 [ - + ]: 1 : if (session_replication_state == NULL)
3835 andres@anarazel.de 1471 [ # # ]:UBC 0 : ereport(ERROR,
1472 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1473 : : errmsg("no replication origin is configured")));
1474 : :
3683 alvherre@alvh.no-ip. 1475 :CBC 1 : replorigin_session_origin_lsn = location;
1476 : 1 : replorigin_session_origin_timestamp = PG_GETARG_TIMESTAMPTZ(1);
1477 : :
3835 andres@anarazel.de 1478 : 1 : PG_RETURN_VOID();
1479 : : }
1480 : :
1481 : : Datum
3835 andres@anarazel.de 1482 :UBC 0 : pg_replication_origin_xact_reset(PG_FUNCTION_ARGS)
1483 : : {
1484 : 0 : replorigin_check_prerequisites(true, false);
1485 : :
3683 alvherre@alvh.no-ip. 1486 : 0 : replorigin_session_origin_lsn = InvalidXLogRecPtr;
1487 : 0 : replorigin_session_origin_timestamp = 0;
1488 : :
3835 andres@anarazel.de 1489 : 0 : PG_RETURN_VOID();
1490 : : }
1491 : :
1492 : :
1493 : : Datum
3835 andres@anarazel.de 1494 :CBC 3 : pg_replication_origin_advance(PG_FUNCTION_ARGS)
1495 : : {
3152 noah@leadboat.com 1496 : 3 : text *name = PG_GETARG_TEXT_PP(0);
3811 bruce@momjian.us 1497 : 3 : XLogRecPtr remote_commit = PG_GETARG_LSN(1);
1498 : : RepOriginId node;
1499 : :
3835 andres@anarazel.de 1500 : 3 : replorigin_check_prerequisites(true, false);
1501 : :
1502 : : /* lock to prevent the replication origin from vanishing */
1503 : 3 : LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1504 : :
1505 : 3 : node = replorigin_by_name(text_to_cstring(name), false);
1506 : :
1507 : : /*
1508 : : * Can't sensibly pass a local commit to be flushed at checkpoint - this
1509 : : * xact hasn't committed yet. This is why this function should be used to
1510 : : * set up the initial replication state, but not for replay.
1511 : : */
1512 : 2 : replorigin_advance(node, remote_commit, InvalidXLogRecPtr,
1513 : : true /* go backward */ , true /* WAL log */ );
1514 : :
1515 : 2 : UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1516 : :
1517 : 2 : PG_RETURN_VOID();
1518 : : }
1519 : :
1520 : :
1521 : : /*
1522 : : * Return the replication progress for an individual replication origin.
1523 : : *
1524 : : * If 'flush' is set to true it is ensured that the returned value corresponds
1525 : : * to a local transaction that has been flushed. This is useful if asynchronous
1526 : : * commits are used when replaying replicated transactions.
1527 : : */
1528 : : Datum
1529 : 3 : pg_replication_origin_progress(PG_FUNCTION_ARGS)
1530 : : {
1531 : : char *name;
1532 : : bool flush;
1533 : : RepOriginId roident;
1534 : 3 : XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1535 : :
1536 : 3 : replorigin_check_prerequisites(true, true);
1537 : :
1538 : 3 : name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1539 : 3 : flush = PG_GETARG_BOOL(1);
1540 : :
1541 : 3 : roident = replorigin_by_name(name, false);
1542 [ - + ]: 2 : Assert(OidIsValid(roident));
1543 : :
1544 : 2 : remote_lsn = replorigin_get_progress(roident, flush);
1545 : :
1546 [ - + ]: 2 : if (remote_lsn == InvalidXLogRecPtr)
3835 andres@anarazel.de 1547 :UBC 0 : PG_RETURN_NULL();
1548 : :
3835 andres@anarazel.de 1549 :CBC 2 : PG_RETURN_LSN(remote_lsn);
1550 : : }
1551 : :
1552 : :
1553 : : Datum
1554 : 10 : pg_show_replication_origin_status(PG_FUNCTION_ARGS)
1555 : : {
1556 : 10 : ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1557 : : int i;
1558 : : #define REPLICATION_ORIGIN_PROGRESS_COLS 4
1559 : :
1560 : : /* we want to return 0 rows if slot is set to zero */
1561 : 10 : replorigin_check_prerequisites(false, true);
1562 : :
1106 michael@paquier.xyz 1563 : 10 : InitMaterializedSRF(fcinfo, 0);
1564 : :
1565 : : /* prevent slots from being concurrently dropped */
3835 andres@anarazel.de 1566 : 10 : LWLockAcquire(ReplicationOriginLock, LW_SHARED);
1567 : :
1568 : : /*
1569 : : * Iterate through all possible replication_states, display if they are
1570 : : * filled. Note that we do not take any locks, so slightly corrupted/out
1571 : : * of date values are a possibility.
1572 : : */
221 msawada@postgresql.o 1573 [ + + ]: 110 : for (i = 0; i < max_active_replication_origins; i++)
1574 : : {
1575 : : ReplicationState *state;
1576 : : Datum values[REPLICATION_ORIGIN_PROGRESS_COLS];
1577 : : bool nulls[REPLICATION_ORIGIN_PROGRESS_COLS];
1578 : : char *roname;
1579 : :
3835 andres@anarazel.de 1580 : 100 : state = &replication_states[i];
1581 : :
1582 : : /* unused slot, nothing to display */
1583 [ + + ]: 100 : if (state->roident == InvalidRepOriginId)
1584 : 87 : continue;
1585 : :
1586 : 13 : memset(values, 0, sizeof(values));
1587 : 13 : memset(nulls, 1, sizeof(nulls));
1588 : :
1589 : 13 : values[0] = ObjectIdGetDatum(state->roident);
1590 : 13 : nulls[0] = false;
1591 : :
1592 : : /*
1593 : : * We're not preventing the origin to be dropped concurrently, so
1594 : : * silently accept that it might be gone.
1595 : : */
1596 [ + - ]: 13 : if (replorigin_by_oid(state->roident, true,
1597 : : &roname))
1598 : : {
1599 : 13 : values[1] = CStringGetTextDatum(roname);
1600 : 13 : nulls[1] = false;
1601 : : }
1602 : :
1603 : 13 : LWLockAcquire(&state->lock, LW_SHARED);
1604 : :
3811 bruce@momjian.us 1605 : 13 : values[2] = LSNGetDatum(state->remote_lsn);
3835 andres@anarazel.de 1606 : 13 : nulls[2] = false;
1607 : :
1608 : 13 : values[3] = LSNGetDatum(state->local_lsn);
1609 : 13 : nulls[3] = false;
1610 : :
1611 : 13 : LWLockRelease(&state->lock);
1612 : :
1331 michael@paquier.xyz 1613 : 13 : tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
1614 : : values, nulls);
1615 : : }
1616 : :
3835 andres@anarazel.de 1617 : 10 : LWLockRelease(ReplicationOriginLock);
1618 : :
1619 : : #undef REPLICATION_ORIGIN_PROGRESS_COLS
1620 : :
1621 : 10 : return (Datum) 0;
1622 : : }
|