Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * origin.c
4 : : * Logical replication progress tracking support.
5 : : *
6 : : * Copyright (c) 2013-2025, PostgreSQL Global Development Group
7 : : *
8 : : * IDENTIFICATION
9 : : * src/backend/replication/logical/origin.c
10 : : *
11 : : * NOTES
12 : : *
13 : : * This file provides the following:
14 : : * * An infrastructure to name nodes in a replication setup
15 : : * * A facility to efficiently store and persist replication progress in an
16 : : * efficient and durable manner.
17 : : *
18 : : * Replication origin consist out of a descriptive, user defined, external
19 : : * name and a short, thus space efficient, internal 2 byte one. This split
20 : : * exists because replication origin have to be stored in WAL and shared
21 : : * memory and long descriptors would be inefficient. For now only use 2 bytes
22 : : * for the internal id of a replication origin as it seems unlikely that there
23 : : * soon will be more than 65k nodes in one replication setup; and using only
24 : : * two bytes allow us to be more space efficient.
25 : : *
26 : : * Replication progress is tracked in a shared memory table
27 : : * (ReplicationState) that's dumped to disk every checkpoint. Entries
28 : : * ('slots') in this table are identified by the internal id. That's the case
29 : : * because it allows to increase replication progress during crash
30 : : * recovery. To allow doing so we store the original LSN (from the originating
31 : : * system) of a transaction in the commit record. That allows to recover the
32 : : * precise replayed state after crash recovery; without requiring synchronous
33 : : * commits. Allowing logical replication to use asynchronous commit is
34 : : * generally good for performance, but especially important as it allows a
35 : : * single threaded replay process to keep up with a source that has multiple
36 : : * backends generating changes concurrently. For efficiency and simplicity
37 : : * reasons a backend can setup one replication origin that's from then used as
38 : : * the source of changes produced by the backend, until reset again.
39 : : *
40 : : * This infrastructure is intended to be used in cooperation with logical
41 : : * decoding. When replaying from a remote system the configured origin is
42 : : * provided to output plugins, allowing prevention of replication loops and
43 : : * other filtering.
44 : : *
45 : : * There are several levels of locking at work:
46 : : *
47 : : * * To create and drop replication origins an exclusive lock on
48 : : * pg_replication_slot is required for the duration. That allows us to
49 : : * safely and conflict free assign new origins using a dirty snapshot.
50 : : *
51 : : * * When creating an in-memory replication progress slot the ReplicationOrigin
52 : : * LWLock has to be held exclusively; when iterating over the replication
53 : : * progress a shared lock has to be held, the same when advancing the
54 : : * replication progress of an individual backend that has not setup as the
55 : : * session's replication origin.
56 : : *
57 : : * * When manipulating or looking at the remote_lsn and local_lsn fields of a
58 : : * replication progress slot that slot's lwlock has to be held. That's
59 : : * primarily because we do not assume 8 byte writes (the LSN) is atomic on
60 : : * all our platforms, but it also simplifies memory ordering concerns
61 : : * between the remote and local lsn. We use a lwlock instead of a spinlock
62 : : * so it's less harmful to hold the lock over a WAL write
63 : : * (cf. AdvanceReplicationProgress).
64 : : *
65 : : * ---------------------------------------------------------------------------
66 : : */
67 : :
68 : : #include "postgres.h"
69 : :
70 : : #include <unistd.h>
71 : : #include <sys/stat.h>
72 : :
73 : : #include "access/genam.h"
74 : : #include "access/htup_details.h"
75 : : #include "access/table.h"
76 : : #include "access/xact.h"
77 : : #include "access/xloginsert.h"
78 : : #include "catalog/catalog.h"
79 : : #include "catalog/indexing.h"
80 : : #include "catalog/pg_subscription.h"
81 : : #include "funcapi.h"
82 : : #include "miscadmin.h"
83 : : #include "nodes/execnodes.h"
84 : : #include "pgstat.h"
85 : : #include "replication/origin.h"
86 : : #include "replication/slot.h"
87 : : #include "storage/condition_variable.h"
88 : : #include "storage/fd.h"
89 : : #include "storage/ipc.h"
90 : : #include "storage/lmgr.h"
91 : : #include "utils/builtins.h"
92 : : #include "utils/fmgroids.h"
93 : : #include "utils/guc.h"
94 : : #include "utils/pg_lsn.h"
95 : : #include "utils/rel.h"
96 : : #include "utils/snapmgr.h"
97 : : #include "utils/syscache.h"
98 : :
99 : : /* paths for replication origin checkpoint files */
100 : : #define PG_REPLORIGIN_CHECKPOINT_FILENAME PG_LOGICAL_DIR "/replorigin_checkpoint"
101 : : #define PG_REPLORIGIN_CHECKPOINT_TMPFILE PG_REPLORIGIN_CHECKPOINT_FILENAME ".tmp"
102 : :
103 : : /* GUC variables */
104 : : int max_active_replication_origins = 10;
105 : :
106 : : /*
107 : : * Replay progress of a single remote node.
108 : : */
109 : : typedef struct ReplicationState
110 : : {
111 : : /*
112 : : * Local identifier for the remote node.
113 : : */
114 : : RepOriginId roident;
115 : :
116 : : /*
117 : : * Location of the latest commit from the remote side.
118 : : */
119 : : XLogRecPtr remote_lsn;
120 : :
121 : : /*
122 : : * Remember the local lsn of the commit record so we can XLogFlush() to it
123 : : * during a checkpoint so we know the commit record actually is safe on
124 : : * disk.
125 : : */
126 : : XLogRecPtr local_lsn;
127 : :
128 : : /*
129 : : * PID of backend that's acquired slot, or 0 if none.
130 : : */
131 : : int acquired_by;
132 : :
133 : : /*
134 : : * Condition variable that's signaled when acquired_by changes.
135 : : */
136 : : ConditionVariable origin_cv;
137 : :
138 : : /*
139 : : * Lock protecting remote_lsn and local_lsn.
140 : : */
141 : : LWLock lock;
142 : : } ReplicationState;
143 : :
144 : : /*
145 : : * On disk version of ReplicationState.
146 : : */
147 : : typedef struct ReplicationStateOnDisk
148 : : {
149 : : RepOriginId roident;
150 : : XLogRecPtr remote_lsn;
151 : : } ReplicationStateOnDisk;
152 : :
153 : :
154 : : typedef struct ReplicationStateCtl
155 : : {
156 : : /* Tranche to use for per-origin LWLocks */
157 : : int tranche_id;
158 : : /* Array of length max_active_replication_origins */
159 : : ReplicationState states[FLEXIBLE_ARRAY_MEMBER];
160 : : } ReplicationStateCtl;
161 : :
162 : : /* external variables */
163 : : RepOriginId replorigin_session_origin = InvalidRepOriginId; /* assumed identity */
164 : : XLogRecPtr replorigin_session_origin_lsn = InvalidXLogRecPtr;
165 : : TimestampTz replorigin_session_origin_timestamp = 0;
166 : :
167 : : /*
168 : : * Base address into a shared memory array of replication states of size
169 : : * max_active_replication_origins.
170 : : */
171 : : static ReplicationState *replication_states;
172 : :
173 : : /*
174 : : * Actual shared memory block (replication_states[] is now part of this).
175 : : */
176 : : static ReplicationStateCtl *replication_states_ctl;
177 : :
178 : : /*
179 : : * We keep a pointer to this backend's ReplicationState to avoid having to
180 : : * search the replication_states array in replorigin_session_advance for each
181 : : * remote commit. (Ownership of a backend's own entry can only be changed by
182 : : * that backend.)
183 : : */
184 : : static ReplicationState *session_replication_state = NULL;
185 : :
186 : : /* Magic for on disk files. */
187 : : #define REPLICATION_STATE_MAGIC ((uint32) 0x1257DADE)
188 : :
189 : : static void
169 msawada@postgresql.o 190 :CBC 47 : replorigin_check_prerequisites(bool check_origins, bool recoveryOK)
191 : : {
192 [ + + - + ]: 47 : if (check_origins && max_active_replication_origins == 0)
3783 andres@anarazel.de 193 [ # # ]:UBC 0 : ereport(ERROR,
194 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
195 : : errmsg("cannot query or manipulate replication origin when \"max_active_replication_origins\" is 0")));
196 : :
3783 andres@anarazel.de 197 [ + + - + ]:CBC 47 : if (!recoveryOK && RecoveryInProgress())
3783 andres@anarazel.de 198 [ # # ]:UBC 0 : ereport(ERROR,
199 : : (errcode(ERRCODE_READ_ONLY_SQL_TRANSACTION),
200 : : errmsg("cannot manipulate replication origins during recovery")));
3783 andres@anarazel.de 201 :CBC 47 : }
202 : :
203 : :
204 : : /*
205 : : * IsReservedOriginName
206 : : * True iff name is either "none" or "any".
207 : : */
208 : : static bool
1143 akapila@postgresql.o 209 : 11 : IsReservedOriginName(const char *name)
210 : : {
211 [ + + + + ]: 21 : return ((pg_strcasecmp(name, LOGICALREP_ORIGIN_NONE) == 0) ||
212 : 10 : (pg_strcasecmp(name, LOGICALREP_ORIGIN_ANY) == 0));
213 : : }
214 : :
215 : : /* ---------------------------------------------------------------------------
216 : : * Functions for working with replication origins themselves.
217 : : * ---------------------------------------------------------------------------
218 : : */
219 : :
220 : : /*
221 : : * Check for a persistent replication origin identified by name.
222 : : *
223 : : * Returns InvalidOid if the node isn't known yet and missing_ok is true.
224 : : */
225 : : RepOriginId
1549 peter@eisentraut.org 226 : 969 : replorigin_by_name(const char *roname, bool missing_ok)
227 : : {
228 : : Form_pg_replication_origin ident;
3759 bruce@momjian.us 229 : 969 : Oid roident = InvalidOid;
230 : : HeapTuple tuple;
231 : : Datum roname_d;
232 : :
3783 andres@anarazel.de 233 : 969 : roname_d = CStringGetTextDatum(roname);
234 : :
235 : 969 : tuple = SearchSysCache1(REPLORIGNAME, roname_d);
236 [ + + ]: 969 : if (HeapTupleIsValid(tuple))
237 : : {
238 : 584 : ident = (Form_pg_replication_origin) GETSTRUCT(tuple);
239 : 584 : roident = ident->roident;
240 : 584 : ReleaseSysCache(tuple);
241 : : }
242 [ + + ]: 385 : else if (!missing_ok)
2893 rhaas@postgresql.org 243 [ + - ]: 4 : ereport(ERROR,
244 : : (errcode(ERRCODE_UNDEFINED_OBJECT),
245 : : errmsg("replication origin \"%s\" does not exist",
246 : : roname)));
247 : :
3783 andres@anarazel.de 248 : 965 : return roident;
249 : : }
250 : :
251 : : /*
252 : : * Create a replication origin.
253 : : *
254 : : * Needs to be called in a transaction.
255 : : */
256 : : RepOriginId
1549 peter@eisentraut.org 257 : 370 : replorigin_create(const char *roname)
258 : : {
259 : : Oid roident;
3759 bruce@momjian.us 260 : 370 : HeapTuple tuple = NULL;
261 : : Relation rel;
262 : : Datum roname_d;
263 : : SnapshotData SnapshotDirty;
264 : : SysScanDesc scan;
265 : : ScanKeyData key;
266 : :
267 : : /*
268 : : * To avoid needing a TOAST table for pg_replication_origin, we limit
269 : : * replication origin names to 512 bytes. This should be more than enough
270 : : * for all practical use.
271 : : */
122 nathan@postgresql.or 272 [ + + ]: 370 : if (strlen(roname) > MAX_RONAME_LEN)
273 [ + - ]: 3 : ereport(ERROR,
274 : : (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
275 : : errmsg("replication origin name is too long"),
276 : : errdetail("Replication origin names must be no longer than %d bytes.",
277 : : MAX_RONAME_LEN)));
278 : :
3783 andres@anarazel.de 279 : 367 : roname_d = CStringGetTextDatum(roname);
280 : :
281 [ - + ]: 367 : Assert(IsTransactionState());
282 : :
283 : : /*
284 : : * We need the numeric replication origin to be 16bit wide, so we cannot
285 : : * rely on the normal oid allocation. Instead we simply scan
286 : : * pg_replication_origin for the first unused id. That's not particularly
287 : : * efficient, but this should be a fairly infrequent operation - we can
288 : : * easily spend a bit more code on this when it turns out it needs to be
289 : : * faster.
290 : : *
291 : : * We handle concurrency by taking an exclusive lock (allowing reads!)
292 : : * over the table for the duration of the search. Because we use a "dirty
293 : : * snapshot" we can read rows that other in-progress sessions have
294 : : * written, even though they would be invisible with normal snapshots. Due
295 : : * to the exclusive lock there's no danger that new rows can appear while
296 : : * we're checking.
297 : : */
298 : 367 : InitDirtySnapshot(SnapshotDirty);
299 : :
2420 300 : 367 : rel = table_open(ReplicationOriginRelationId, ExclusiveLock);
301 : :
302 : : /*
303 : : * We want to be able to access pg_replication_origin without setting up a
304 : : * snapshot. To make that safe, it needs to not have a TOAST table, since
305 : : * TOASTed data cannot be fetched without a snapshot. As of this writing,
306 : : * its only varlena column is roname, which we limit to 512 bytes to avoid
307 : : * needing out-of-line storage. If you add a TOAST table to this catalog,
308 : : * be sure to set up a snapshot everywhere it might be needed. For more
309 : : * information, see https://postgr.es/m/ZvMSUPOqUU-VNADN%40nathan.
310 : : */
122 nathan@postgresql.or 311 [ - + ]: 367 : Assert(!OidIsValid(rel->rd_rel->reltoastrelid));
312 : :
3782 andres@anarazel.de 313 [ + - ]: 669 : for (roident = InvalidOid + 1; roident < PG_UINT16_MAX; roident++)
314 : : {
315 : : bool nulls[Natts_pg_replication_origin];
316 : : Datum values[Natts_pg_replication_origin];
317 : : bool collides;
318 : :
3783 319 [ - + ]: 669 : CHECK_FOR_INTERRUPTS();
320 : :
321 : 669 : ScanKeyInit(&key,
322 : : Anum_pg_replication_origin_roident,
323 : : BTEqualStrategyNumber, F_OIDEQ,
324 : : ObjectIdGetDatum(roident));
325 : :
326 : 669 : scan = systable_beginscan(rel, ReplicationOriginIdentIndex,
327 : : true /* indexOK */ ,
328 : : &SnapshotDirty,
329 : : 1, &key);
330 : :
331 : 669 : collides = HeapTupleIsValid(systable_getnext(scan));
332 : :
333 : 669 : systable_endscan(scan);
334 : :
335 [ + + ]: 669 : if (!collides)
336 : : {
337 : : /*
338 : : * Ok, found an unused roident, insert the new row and do a CCI,
339 : : * so our callers can look it up if they want to.
340 : : */
341 : 367 : memset(&nulls, 0, sizeof(nulls));
342 : :
3759 bruce@momjian.us 343 : 367 : values[Anum_pg_replication_origin_roident - 1] = ObjectIdGetDatum(roident);
3783 andres@anarazel.de 344 : 367 : values[Anum_pg_replication_origin_roname - 1] = roname_d;
345 : :
346 : 367 : tuple = heap_form_tuple(RelationGetDescr(rel), values, nulls);
3140 alvherre@alvh.no-ip. 347 : 367 : CatalogTupleInsert(rel, tuple);
3783 andres@anarazel.de 348 : 366 : CommandCounterIncrement();
349 : 366 : break;
350 : : }
351 : : }
352 : :
353 : : /* now release lock again, */
2420 354 : 366 : table_close(rel, ExclusiveLock);
355 : :
3783 356 [ - + ]: 366 : if (tuple == NULL)
3783 andres@anarazel.de 357 [ # # ]:UBC 0 : ereport(ERROR,
358 : : (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
359 : : errmsg("could not find free replication origin ID")));
360 : :
3783 andres@anarazel.de 361 :CBC 366 : heap_freetuple(tuple);
362 : 366 : return roident;
363 : : }
364 : :
365 : : /*
366 : : * Helper function to drop a replication origin.
367 : : */
368 : : static void
946 akapila@postgresql.o 369 : 301 : replorigin_state_clear(RepOriginId roident, bool nowait)
370 : : {
371 : : int i;
372 : :
373 : : /*
374 : : * Clean up the slot state info, if there is any matching slot.
375 : : */
2951 alvherre@alvh.no-ip. 376 : 301 : restart:
3783 andres@anarazel.de 377 : 301 : LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
378 : :
169 msawada@postgresql.o 379 [ + + ]: 1006 : for (i = 0; i < max_active_replication_origins; i++)
380 : : {
3783 andres@anarazel.de 381 : 961 : ReplicationState *state = &replication_states[i];
382 : :
383 [ + + ]: 961 : if (state->roident == roident)
384 : : {
385 : : /* found our slot, is it busy? */
386 [ - + ]: 256 : if (state->acquired_by != 0)
387 : : {
388 : : ConditionVariable *cv;
389 : :
2951 alvherre@alvh.no-ip. 390 [ # # ]:UBC 0 : if (nowait)
391 [ # # ]: 0 : ereport(ERROR,
392 : : (errcode(ERRCODE_OBJECT_IN_USE),
393 : : errmsg("could not drop replication origin with ID %d, in use by PID %d",
394 : : state->roident,
395 : : state->acquired_by)));
396 : :
397 : : /*
398 : : * We must wait and then retry. Since we don't know which CV
399 : : * to wait on until here, we can't readily use
400 : : * ConditionVariablePrepareToSleep (calling it here would be
401 : : * wrong, since we could miss the signal if we did so); just
402 : : * use ConditionVariableSleep directly.
403 : : */
404 : 0 : cv = &state->origin_cv;
405 : :
406 : 0 : LWLockRelease(ReplicationOriginLock);
407 : :
408 : 0 : ConditionVariableSleep(cv, WAIT_EVENT_REPLICATION_ORIGIN_DROP);
409 : 0 : goto restart;
410 : : }
411 : :
412 : : /* first make a WAL log entry */
413 : : {
414 : : xl_replorigin_drop xlrec;
415 : :
3783 andres@anarazel.de 416 :CBC 256 : xlrec.node_id = roident;
417 : 256 : XLogBeginInsert();
207 peter@eisentraut.org 418 : 256 : XLogRegisterData(&xlrec, sizeof(xlrec));
3783 andres@anarazel.de 419 : 256 : XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_DROP);
420 : : }
421 : :
422 : : /* then clear the in-memory slot */
423 : 256 : state->roident = InvalidRepOriginId;
424 : 256 : state->remote_lsn = InvalidXLogRecPtr;
425 : 256 : state->local_lsn = InvalidXLogRecPtr;
426 : 256 : break;
427 : : }
428 : : }
429 : 301 : LWLockRelease(ReplicationOriginLock);
2797 tgl@sss.pgh.pa.us 430 : 301 : ConditionVariableCancelSleep();
3783 andres@anarazel.de 431 : 301 : }
432 : :
433 : : /*
434 : : * Drop replication origin (by name).
435 : : *
436 : : * Needs to be called in a transaction.
437 : : */
438 : : void
1549 peter@eisentraut.org 439 : 489 : replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait)
440 : : {
441 : : RepOriginId roident;
442 : : Relation rel;
443 : : HeapTuple tuple;
444 : :
1669 akapila@postgresql.o 445 [ - + ]: 489 : Assert(IsTransactionState());
446 : :
946 447 : 489 : rel = table_open(ReplicationOriginRelationId, RowExclusiveLock);
448 : :
1669 449 : 489 : roident = replorigin_by_name(name, missing_ok);
450 : :
451 : : /* Lock the origin to prevent concurrent drops. */
946 452 : 488 : LockSharedObject(ReplicationOriginRelationId, roident, 0,
453 : : AccessExclusiveLock);
454 : :
455 : 488 : tuple = SearchSysCache1(REPLORIGIDENT, ObjectIdGetDatum(roident));
456 [ + + ]: 488 : if (!HeapTupleIsValid(tuple))
457 : : {
458 [ - + ]: 187 : if (!missing_ok)
946 akapila@postgresql.o 459 [ # # ]:UBC 0 : elog(ERROR, "cache lookup failed for replication origin with ID %d",
460 : : roident);
461 : :
462 : : /*
463 : : * We don't need to retain the locks if the origin is already dropped.
464 : : */
946 akapila@postgresql.o 465 :CBC 187 : UnlockSharedObject(ReplicationOriginRelationId, roident, 0,
466 : : AccessExclusiveLock);
467 : 187 : table_close(rel, RowExclusiveLock);
468 : 187 : return;
469 : : }
470 : :
471 : 301 : replorigin_state_clear(roident, nowait);
472 : :
473 : : /*
474 : : * Now, we can delete the catalog entry.
475 : : */
476 : 301 : CatalogTupleDelete(rel, &tuple->t_self);
477 : 301 : ReleaseSysCache(tuple);
478 : :
479 : 301 : CommandCounterIncrement();
480 : :
481 : : /* We keep the lock on pg_replication_origin until commit */
1669 482 : 301 : table_close(rel, NoLock);
483 : : }
484 : :
485 : : /*
486 : : * Lookup replication origin via its oid and return the name.
487 : : *
488 : : * The external name is palloc'd in the calling context.
489 : : *
490 : : * Returns true if the origin is known, false otherwise.
491 : : */
492 : : bool
3783 andres@anarazel.de 493 : 27 : replorigin_by_oid(RepOriginId roident, bool missing_ok, char **roname)
494 : : {
495 : : HeapTuple tuple;
496 : : Form_pg_replication_origin ric;
497 : :
498 [ - + ]: 27 : Assert(OidIsValid((Oid) roident));
499 [ - + ]: 27 : Assert(roident != InvalidRepOriginId);
500 [ - + ]: 27 : Assert(roident != DoNotReplicateId);
501 : :
502 : 27 : tuple = SearchSysCache1(REPLORIGIDENT,
503 : : ObjectIdGetDatum((Oid) roident));
504 : :
505 [ + + ]: 27 : if (HeapTupleIsValid(tuple))
506 : : {
507 : 22 : ric = (Form_pg_replication_origin) GETSTRUCT(tuple);
508 : 22 : *roname = text_to_cstring(&ric->roname);
509 : 22 : ReleaseSysCache(tuple);
510 : :
511 : 22 : return true;
512 : : }
513 : : else
514 : : {
515 : 5 : *roname = NULL;
516 : :
517 [ - + ]: 5 : if (!missing_ok)
2893 rhaas@postgresql.org 518 [ # # ]:UBC 0 : ereport(ERROR,
519 : : (errcode(ERRCODE_UNDEFINED_OBJECT),
520 : : errmsg("replication origin with ID %d does not exist",
521 : : roident)));
522 : :
3783 andres@anarazel.de 523 :CBC 5 : return false;
524 : : }
525 : : }
526 : :
527 : :
528 : : /* ---------------------------------------------------------------------------
529 : : * Functions for handling replication progress.
530 : : * ---------------------------------------------------------------------------
531 : : */
532 : :
533 : : Size
534 : 3965 : ReplicationOriginShmemSize(void)
535 : : {
536 : 3965 : Size size = 0;
537 : :
169 msawada@postgresql.o 538 [ + + ]: 3965 : if (max_active_replication_origins == 0)
3783 andres@anarazel.de 539 : 2 : return size;
540 : :
541 : 3963 : size = add_size(size, offsetof(ReplicationStateCtl, states));
542 : :
543 : 3963 : size = add_size(size,
544 : : mul_size(max_active_replication_origins, sizeof(ReplicationState)));
545 : 3963 : return size;
546 : : }
547 : :
548 : : void
549 : 1029 : ReplicationOriginShmemInit(void)
550 : : {
551 : : bool found;
552 : :
169 msawada@postgresql.o 553 [ + + ]: 1029 : if (max_active_replication_origins == 0)
3783 andres@anarazel.de 554 : 1 : return;
555 : :
556 : 1028 : replication_states_ctl = (ReplicationStateCtl *)
557 : 1028 : ShmemInitStruct("ReplicationOriginState",
558 : : ReplicationOriginShmemSize(),
559 : : &found);
3759 bruce@momjian.us 560 : 1028 : replication_states = replication_states_ctl->states;
561 : :
3783 andres@anarazel.de 562 [ + - ]: 1028 : if (!found)
563 : : {
564 : : int i;
565 : :
1940 tgl@sss.pgh.pa.us 566 [ + - + - : 73953 : MemSet(replication_states_ctl, 0, ReplicationOriginShmemSize());
+ - + - +
+ ]
567 : :
568 : 1028 : replication_states_ctl->tranche_id = LWTRANCHE_REPLICATION_ORIGIN_STATE;
569 : :
169 msawada@postgresql.o 570 [ + + ]: 11299 : for (i = 0; i < max_active_replication_origins; i++)
571 : : {
3783 andres@anarazel.de 572 : 10271 : LWLockInitialize(&replication_states[i].lock,
573 : 10271 : replication_states_ctl->tranche_id);
2951 alvherre@alvh.no-ip. 574 : 10271 : ConditionVariableInit(&replication_states[i].origin_cv);
575 : : }
576 : : }
577 : : }
578 : :
579 : : /* ---------------------------------------------------------------------------
580 : : * Perform a checkpoint of each replication origin's progress with respect to
581 : : * the replayed remote_lsn. Make sure that all transactions we refer to in the
582 : : * checkpoint (local_lsn) are actually on-disk. This might not yet be the case
583 : : * if the transactions were originally committed asynchronously.
584 : : *
585 : : * We store checkpoints in the following format:
586 : : * +-------+------------------------+------------------+-----+--------+
587 : : * | MAGIC | ReplicationStateOnDisk | struct Replic... | ... | CRC32C | EOF
588 : : * +-------+------------------------+------------------+-----+--------+
589 : : *
590 : : * So its just the magic, followed by the statically sized
591 : : * ReplicationStateOnDisk structs. Note that the maximum number of
592 : : * ReplicationState is determined by max_active_replication_origins.
593 : : * ---------------------------------------------------------------------------
594 : : */
595 : : void
3783 andres@anarazel.de 596 : 1677 : CheckPointReplicationOrigin(void)
597 : : {
372 michael@paquier.xyz 598 : 1677 : const char *tmppath = PG_REPLORIGIN_CHECKPOINT_TMPFILE;
599 : 1677 : const char *path = PG_REPLORIGIN_CHECKPOINT_FILENAME;
600 : : int tmpfd;
601 : : int i;
3783 andres@anarazel.de 602 : 1677 : uint32 magic = REPLICATION_STATE_MAGIC;
603 : : pg_crc32c crc;
604 : :
169 msawada@postgresql.o 605 [ + + ]: 1677 : if (max_active_replication_origins == 0)
3783 andres@anarazel.de 606 : 1 : return;
607 : :
608 : 1676 : INIT_CRC32C(crc);
609 : :
610 : : /* make sure no old temp file is remaining */
611 [ + - - + ]: 1676 : if (unlink(tmppath) < 0 && errno != ENOENT)
3783 andres@anarazel.de 612 [ # # ]:UBC 0 : ereport(PANIC,
613 : : (errcode_for_file_access(),
614 : : errmsg("could not remove file \"%s\": %m",
615 : : tmppath)));
616 : :
617 : : /*
618 : : * no other backend can perform this at the same time; only one checkpoint
619 : : * can happen at a time.
620 : : */
2905 peter_e@gmx.net 621 :CBC 1676 : tmpfd = OpenTransientFile(tmppath,
622 : : O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
3783 andres@anarazel.de 623 [ - + ]: 1676 : if (tmpfd < 0)
3783 andres@anarazel.de 624 [ # # ]:UBC 0 : ereport(PANIC,
625 : : (errcode_for_file_access(),
626 : : errmsg("could not create file \"%s\": %m",
627 : : tmppath)));
628 : :
629 : : /* write magic */
2589 michael@paquier.xyz 630 :CBC 1676 : errno = 0;
3783 andres@anarazel.de 631 [ - + ]: 1676 : if ((write(tmpfd, &magic, sizeof(magic))) != sizeof(magic))
632 : : {
633 : : /* if write didn't set errno, assume problem is no disk space */
2334 michael@paquier.xyz 634 [ # # ]:UBC 0 : if (errno == 0)
635 : 0 : errno = ENOSPC;
3783 andres@anarazel.de 636 [ # # ]: 0 : ereport(PANIC,
637 : : (errcode_for_file_access(),
638 : : errmsg("could not write to file \"%s\": %m",
639 : : tmppath)));
640 : : }
3783 andres@anarazel.de 641 :CBC 1676 : COMP_CRC32C(crc, &magic, sizeof(magic));
642 : :
643 : : /* prevent concurrent creations/drops */
644 : 1676 : LWLockAcquire(ReplicationOriginLock, LW_SHARED);
645 : :
646 : : /* write actual data */
169 msawada@postgresql.o 647 [ + + ]: 18436 : for (i = 0; i < max_active_replication_origins; i++)
648 : : {
649 : : ReplicationStateOnDisk disk_state;
3783 andres@anarazel.de 650 : 16760 : ReplicationState *curstate = &replication_states[i];
651 : : XLogRecPtr local_lsn;
652 : :
653 [ + + ]: 16760 : if (curstate->roident == InvalidRepOriginId)
654 : 16707 : continue;
655 : :
656 : : /* zero, to avoid uninitialized padding bytes */
3058 657 : 53 : memset(&disk_state, 0, sizeof(disk_state));
658 : :
3783 659 : 53 : LWLockAcquire(&curstate->lock, LW_SHARED);
660 : :
661 : 53 : disk_state.roident = curstate->roident;
662 : :
663 : 53 : disk_state.remote_lsn = curstate->remote_lsn;
664 : 53 : local_lsn = curstate->local_lsn;
665 : :
666 : 53 : LWLockRelease(&curstate->lock);
667 : :
668 : : /* make sure we only write out a commit that's persistent */
669 : 53 : XLogFlush(local_lsn);
670 : :
2589 michael@paquier.xyz 671 : 53 : errno = 0;
3783 andres@anarazel.de 672 [ - + ]: 53 : if ((write(tmpfd, &disk_state, sizeof(disk_state))) !=
673 : : sizeof(disk_state))
674 : : {
675 : : /* if write didn't set errno, assume problem is no disk space */
2334 michael@paquier.xyz 676 [ # # ]:UBC 0 : if (errno == 0)
677 : 0 : errno = ENOSPC;
3783 andres@anarazel.de 678 [ # # ]: 0 : ereport(PANIC,
679 : : (errcode_for_file_access(),
680 : : errmsg("could not write to file \"%s\": %m",
681 : : tmppath)));
682 : : }
683 : :
3783 andres@anarazel.de 684 :CBC 53 : COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
685 : : }
686 : :
687 : 1676 : LWLockRelease(ReplicationOriginLock);
688 : :
689 : : /* write out the CRC */
690 : 1676 : FIN_CRC32C(crc);
2589 michael@paquier.xyz 691 : 1676 : errno = 0;
3783 andres@anarazel.de 692 [ - + ]: 1676 : if ((write(tmpfd, &crc, sizeof(crc))) != sizeof(crc))
693 : : {
694 : : /* if write didn't set errno, assume problem is no disk space */
2334 michael@paquier.xyz 695 [ # # ]:UBC 0 : if (errno == 0)
696 : 0 : errno = ENOSPC;
3783 andres@anarazel.de 697 [ # # ]: 0 : ereport(PANIC,
698 : : (errcode_for_file_access(),
699 : : errmsg("could not write to file \"%s\": %m",
700 : : tmppath)));
701 : : }
702 : :
2254 peter@eisentraut.org 703 [ - + ]:CBC 1676 : if (CloseTransientFile(tmpfd) != 0)
2373 michael@paquier.xyz 704 [ # # ]:UBC 0 : ereport(PANIC,
705 : : (errcode_for_file_access(),
706 : : errmsg("could not close file \"%s\": %m",
707 : : tmppath)));
708 : :
709 : : /* fsync, rename to permanent file, fsync file and directory */
3468 andres@anarazel.de 710 :CBC 1676 : durable_rename(tmppath, path, PANIC);
711 : : }
712 : :
713 : : /*
714 : : * Recover replication replay status from checkpoint data saved earlier by
715 : : * CheckPointReplicationOrigin.
716 : : *
717 : : * This only needs to be called at startup and *not* during every checkpoint
718 : : * read during recovery (e.g. in HS or PITR from a base backup) afterwards. All
719 : : * state thereafter can be recovered by looking at commit records.
720 : : */
721 : : void
3783 722 : 887 : StartupReplicationOrigin(void)
723 : : {
372 michael@paquier.xyz 724 : 887 : const char *path = PG_REPLORIGIN_CHECKPOINT_FILENAME;
725 : : int fd;
726 : : int readBytes;
3759 bruce@momjian.us 727 : 887 : uint32 magic = REPLICATION_STATE_MAGIC;
728 : 887 : int last_state = 0;
729 : : pg_crc32c file_crc;
730 : : pg_crc32c crc;
731 : :
732 : : /* don't want to overwrite already existing state */
733 : : #ifdef USE_ASSERT_CHECKING
734 : : static bool already_started = false;
735 : :
3783 andres@anarazel.de 736 [ - + ]: 887 : Assert(!already_started);
737 : 887 : already_started = true;
738 : : #endif
739 : :
169 msawada@postgresql.o 740 [ + + ]: 887 : if (max_active_replication_origins == 0)
3783 andres@anarazel.de 741 : 51 : return;
742 : :
743 : 886 : INIT_CRC32C(crc);
744 : :
745 [ + + ]: 886 : elog(DEBUG2, "starting up replication origin progress state");
746 : :
2905 peter_e@gmx.net 747 : 886 : fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
748 : :
749 : : /*
750 : : * might have had max_active_replication_origins == 0 last run, or we just
751 : : * brought up a standby.
752 : : */
3783 andres@anarazel.de 753 [ + + + - ]: 886 : if (fd < 0 && errno == ENOENT)
754 : 50 : return;
755 [ - + ]: 836 : else if (fd < 0)
3783 andres@anarazel.de 756 [ # # ]:UBC 0 : ereport(PANIC,
757 : : (errcode_for_file_access(),
758 : : errmsg("could not open file \"%s\": %m",
759 : : path)));
760 : :
761 : : /* verify magic, that is written even if nothing was active */
3783 andres@anarazel.de 762 :CBC 836 : readBytes = read(fd, &magic, sizeof(magic));
763 [ - + ]: 836 : if (readBytes != sizeof(magic))
764 : : {
2607 michael@paquier.xyz 765 [ # # ]:UBC 0 : if (readBytes < 0)
766 [ # # ]: 0 : ereport(PANIC,
767 : : (errcode_for_file_access(),
768 : : errmsg("could not read file \"%s\": %m",
769 : : path)));
770 : : else
771 [ # # ]: 0 : ereport(PANIC,
772 : : (errcode(ERRCODE_DATA_CORRUPTED),
773 : : errmsg("could not read file \"%s\": read %d of %zu",
774 : : path, readBytes, sizeof(magic))));
775 : : }
3783 andres@anarazel.de 776 :CBC 836 : COMP_CRC32C(crc, &magic, sizeof(magic));
777 : :
778 [ - + ]: 836 : if (magic != REPLICATION_STATE_MAGIC)
3783 andres@anarazel.de 779 [ # # ]:UBC 0 : ereport(PANIC,
780 : : (errmsg("replication checkpoint has wrong magic %u instead of %u",
781 : : magic, REPLICATION_STATE_MAGIC)));
782 : :
783 : : /* we can skip locking here, no other access is possible */
784 : :
785 : : /* recover individual states, until there are no more to be found */
786 : : while (true)
3783 andres@anarazel.de 787 :CBC 27 : {
788 : : ReplicationStateOnDisk disk_state;
789 : :
790 : 863 : readBytes = read(fd, &disk_state, sizeof(disk_state));
791 : :
792 : : /* no further data */
793 [ + + ]: 863 : if (readBytes == sizeof(crc))
794 : : {
795 : : /* not pretty, but simple ... */
3759 bruce@momjian.us 796 : 836 : file_crc = *(pg_crc32c *) &disk_state;
3783 andres@anarazel.de 797 : 836 : break;
798 : : }
799 : :
800 [ - + ]: 27 : if (readBytes < 0)
801 : : {
3783 andres@anarazel.de 802 [ # # ]:UBC 0 : ereport(PANIC,
803 : : (errcode_for_file_access(),
804 : : errmsg("could not read file \"%s\": %m",
805 : : path)));
806 : : }
807 : :
3783 andres@anarazel.de 808 [ - + ]:CBC 27 : if (readBytes != sizeof(disk_state))
809 : : {
3783 andres@anarazel.de 810 [ # # ]:UBC 0 : ereport(PANIC,
811 : : (errcode_for_file_access(),
812 : : errmsg("could not read file \"%s\": read %d of %zu",
813 : : path, readBytes, sizeof(disk_state))));
814 : : }
815 : :
3783 andres@anarazel.de 816 :CBC 27 : COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
817 : :
169 msawada@postgresql.o 818 [ - + ]: 27 : if (last_state == max_active_replication_origins)
3783 andres@anarazel.de 819 [ # # ]:UBC 0 : ereport(PANIC,
820 : : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
821 : : errmsg("could not find free replication state, increase \"max_active_replication_origins\"")));
822 : :
823 : : /* copy data to shared memory */
3783 andres@anarazel.de 824 :CBC 27 : replication_states[last_state].roident = disk_state.roident;
825 : 27 : replication_states[last_state].remote_lsn = disk_state.remote_lsn;
826 : 27 : last_state++;
827 : :
1737 peter@eisentraut.org 828 [ + - ]: 27 : ereport(LOG,
829 : : errmsg("recovered replication state of node %d to %X/%08X",
830 : : disk_state.roident,
831 : : LSN_FORMAT_ARGS(disk_state.remote_lsn)));
832 : : }
833 : :
834 : : /* now check checksum */
3783 andres@anarazel.de 835 : 836 : FIN_CRC32C(crc);
836 [ - + ]: 836 : if (file_crc != crc)
3783 andres@anarazel.de 837 [ # # ]:UBC 0 : ereport(PANIC,
838 : : (errcode(ERRCODE_DATA_CORRUPTED),
839 : : errmsg("replication slot checkpoint has wrong checksum %u, expected %u",
840 : : crc, file_crc)));
841 : :
2254 peter@eisentraut.org 842 [ - + ]:CBC 836 : if (CloseTransientFile(fd) != 0)
2373 michael@paquier.xyz 843 [ # # ]:UBC 0 : ereport(PANIC,
844 : : (errcode_for_file_access(),
845 : : errmsg("could not close file \"%s\": %m",
846 : : path)));
847 : : }
848 : :
849 : : void
3783 andres@anarazel.de 850 :CBC 4 : replorigin_redo(XLogReaderState *record)
851 : : {
852 : 4 : uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
853 : :
854 [ + + - ]: 4 : switch (info)
855 : : {
856 : 2 : case XLOG_REPLORIGIN_SET:
857 : : {
858 : 2 : xl_replorigin_set *xlrec =
841 tgl@sss.pgh.pa.us 859 : 2 : (xl_replorigin_set *) XLogRecGetData(record);
860 : :
3783 andres@anarazel.de 861 : 2 : replorigin_advance(xlrec->node_id,
862 : : xlrec->remote_lsn, record->EndRecPtr,
3759 bruce@momjian.us 863 : 2 : xlrec->force /* backward */ ,
864 : : false /* WAL log */ );
3783 andres@anarazel.de 865 : 2 : break;
866 : : }
867 : 2 : case XLOG_REPLORIGIN_DROP:
868 : : {
869 : : xl_replorigin_drop *xlrec;
870 : : int i;
871 : :
872 : 2 : xlrec = (xl_replorigin_drop *) XLogRecGetData(record);
873 : :
169 msawada@postgresql.o 874 [ + - ]: 3 : for (i = 0; i < max_active_replication_origins; i++)
875 : : {
3783 andres@anarazel.de 876 : 3 : ReplicationState *state = &replication_states[i];
877 : :
878 : : /* found our slot */
879 [ + + ]: 3 : if (state->roident == xlrec->node_id)
880 : : {
881 : : /* reset entry */
882 : 2 : state->roident = InvalidRepOriginId;
883 : 2 : state->remote_lsn = InvalidXLogRecPtr;
884 : 2 : state->local_lsn = InvalidXLogRecPtr;
885 : 2 : break;
886 : : }
887 : : }
888 : 2 : break;
889 : : }
3783 andres@anarazel.de 890 :UBC 0 : default:
891 [ # # ]: 0 : elog(PANIC, "replorigin_redo: unknown op code %u", info);
892 : : }
3783 andres@anarazel.de 893 :CBC 4 : }
894 : :
895 : :
896 : : /*
897 : : * Tell the replication origin progress machinery that a commit from 'node'
898 : : * that originated at the LSN remote_commit on the remote node was replayed
899 : : * successfully and that we don't need to do so again. In combination with
900 : : * setting up replorigin_session_origin_lsn and replorigin_session_origin
901 : : * that ensures we won't lose knowledge about that after a crash if the
902 : : * transaction had a persistent effect (think of asynchronous commits).
903 : : *
904 : : * local_commit needs to be a local LSN of the commit so that we can make sure
905 : : * upon a checkpoint that enough WAL has been persisted to disk.
906 : : *
907 : : * Needs to be called with a RowExclusiveLock on pg_replication_origin,
908 : : * unless running in recovery.
909 : : */
910 : : void
911 : 237 : replorigin_advance(RepOriginId node,
912 : : XLogRecPtr remote_commit, XLogRecPtr local_commit,
913 : : bool go_backward, bool wal_log)
914 : : {
915 : : int i;
916 : 237 : ReplicationState *replication_state = NULL;
917 : 237 : ReplicationState *free_state = NULL;
918 : :
919 [ - + ]: 237 : Assert(node != InvalidRepOriginId);
920 : :
921 : : /* we don't track DoNotReplicateId */
922 [ - + ]: 237 : if (node == DoNotReplicateId)
3783 andres@anarazel.de 923 :UBC 0 : return;
924 : :
925 : : /*
926 : : * XXX: For the case where this is called by WAL replay, it'd be more
927 : : * efficient to restore into a backend local hashtable and only dump into
928 : : * shmem after recovery is finished. Let's wait with implementing that
929 : : * till it's shown to be a measurable expense
930 : : */
931 : :
932 : : /* Lock exclusively, as we may have to create a new table entry. */
3783 andres@anarazel.de 933 :CBC 237 : LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
934 : :
935 : : /*
936 : : * Search for either an existing slot for the origin, or a free one we can
937 : : * use.
938 : : */
169 msawada@postgresql.o 939 [ + + ]: 2186 : for (i = 0; i < max_active_replication_origins; i++)
940 : : {
3783 andres@anarazel.de 941 : 1992 : ReplicationState *curstate = &replication_states[i];
942 : :
943 : : /* remember where to insert if necessary */
944 [ + + + + ]: 1992 : if (curstate->roident == InvalidRepOriginId &&
945 : : free_state == NULL)
946 : : {
947 : 195 : free_state = curstate;
948 : 195 : continue;
949 : : }
950 : :
951 : : /* not our slot */
952 [ + + ]: 1797 : if (curstate->roident != node)
953 : : {
954 : 1754 : continue;
955 : : }
956 : :
957 : : /* ok, found slot */
958 : 43 : replication_state = curstate;
959 : :
960 : 43 : LWLockAcquire(&replication_state->lock, LW_EXCLUSIVE);
961 : :
962 : : /* Make sure it's not used by somebody else */
963 [ - + ]: 43 : if (replication_state->acquired_by != 0)
964 : : {
3783 andres@anarazel.de 965 [ # # ]:UBC 0 : ereport(ERROR,
966 : : (errcode(ERRCODE_OBJECT_IN_USE),
967 : : errmsg("replication origin with ID %d is already active for PID %d",
968 : : replication_state->roident,
969 : : replication_state->acquired_by)));
970 : : }
971 : :
3783 andres@anarazel.de 972 :CBC 43 : break;
973 : : }
974 : :
975 [ + + - + ]: 237 : if (replication_state == NULL && free_state == NULL)
3783 andres@anarazel.de 976 [ # # ]:UBC 0 : ereport(ERROR,
977 : : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
978 : : errmsg("could not find free replication state slot for replication origin with ID %d",
979 : : node),
980 : : errhint("Increase \"max_active_replication_origins\" and try again.")));
981 : :
3783 andres@anarazel.de 982 [ + + ]:CBC 237 : if (replication_state == NULL)
983 : : {
984 : : /* initialize new slot */
985 : 194 : LWLockAcquire(&free_state->lock, LW_EXCLUSIVE);
986 : 194 : replication_state = free_state;
987 [ - + ]: 194 : Assert(replication_state->remote_lsn == InvalidXLogRecPtr);
988 [ - + ]: 194 : Assert(replication_state->local_lsn == InvalidXLogRecPtr);
989 : 194 : replication_state->roident = node;
990 : : }
991 : :
992 [ - + ]: 237 : Assert(replication_state->roident != InvalidRepOriginId);
993 : :
994 : : /*
995 : : * If somebody "forcefully" sets this slot, WAL log it, so it's durable
996 : : * and the standby gets the message. Primarily this will be called during
997 : : * WAL replay (of commit records) where no WAL logging is necessary.
998 : : */
999 [ + + ]: 237 : if (wal_log)
1000 : : {
1001 : : xl_replorigin_set xlrec;
1002 : :
1003 : 196 : xlrec.remote_lsn = remote_commit;
1004 : 196 : xlrec.node_id = node;
1005 : 196 : xlrec.force = go_backward;
1006 : :
1007 : 196 : XLogBeginInsert();
207 peter@eisentraut.org 1008 : 196 : XLogRegisterData(&xlrec, sizeof(xlrec));
1009 : :
3783 andres@anarazel.de 1010 : 196 : XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_SET);
1011 : : }
1012 : :
1013 : : /*
1014 : : * Due to - harmless - race conditions during a checkpoint we could see
1015 : : * values here that are older than the ones we already have in memory. We
1016 : : * could also see older values for prepared transactions when the prepare
1017 : : * is sent at a later point of time along with commit prepared and there
1018 : : * are other transactions commits between prepare and commit prepared. See
1019 : : * ReorderBufferFinishPrepared. Don't overwrite those.
1020 : : */
1021 [ + + + + ]: 237 : if (go_backward || replication_state->remote_lsn < remote_commit)
1022 : 230 : replication_state->remote_lsn = remote_commit;
1023 [ + + + + ]: 237 : if (local_commit != InvalidXLogRecPtr &&
1024 [ + - ]: 38 : (go_backward || replication_state->local_lsn < local_commit))
1025 : 40 : replication_state->local_lsn = local_commit;
1026 : 237 : LWLockRelease(&replication_state->lock);
1027 : :
1028 : : /*
1029 : : * Release *after* changing the LSNs, slot isn't acquired and thus could
1030 : : * otherwise be dropped anytime.
1031 : : */
1032 : 237 : LWLockRelease(ReplicationOriginLock);
1033 : : }
1034 : :
1035 : :
1036 : : XLogRecPtr
1037 : 8 : replorigin_get_progress(RepOriginId node, bool flush)
1038 : : {
1039 : : int i;
1040 : 8 : XLogRecPtr local_lsn = InvalidXLogRecPtr;
1041 : 8 : XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1042 : :
1043 : : /* prevent slots from being concurrently dropped */
1044 : 8 : LWLockAcquire(ReplicationOriginLock, LW_SHARED);
1045 : :
169 msawada@postgresql.o 1046 [ + + ]: 38 : for (i = 0; i < max_active_replication_origins; i++)
1047 : : {
1048 : : ReplicationState *state;
1049 : :
3783 andres@anarazel.de 1050 : 35 : state = &replication_states[i];
1051 : :
1052 [ + + ]: 35 : if (state->roident == node)
1053 : : {
1054 : 5 : LWLockAcquire(&state->lock, LW_SHARED);
1055 : :
1056 : 5 : remote_lsn = state->remote_lsn;
1057 : 5 : local_lsn = state->local_lsn;
1058 : :
1059 : 5 : LWLockRelease(&state->lock);
1060 : :
1061 : 5 : break;
1062 : : }
1063 : : }
1064 : :
1065 : 8 : LWLockRelease(ReplicationOriginLock);
1066 : :
1067 [ + + + - ]: 8 : if (flush && local_lsn != InvalidXLogRecPtr)
1068 : 1 : XLogFlush(local_lsn);
1069 : :
1070 : 8 : return remote_lsn;
1071 : : }
1072 : :
1073 : : /*
1074 : : * Tear down a (possibly) configured session replication origin during process
1075 : : * exit.
1076 : : */
1077 : : static void
1078 : 463 : ReplicationOriginExitCleanup(int code, Datum arg)
1079 : : {
2945 tgl@sss.pgh.pa.us 1080 : 463 : ConditionVariable *cv = NULL;
1081 : :
600 alvherre@alvh.no-ip. 1082 [ + + ]: 463 : if (session_replication_state == NULL)
1083 : 185 : return;
1084 : :
3783 andres@anarazel.de 1085 : 278 : LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1086 : :
600 alvherre@alvh.no-ip. 1087 [ + + ]: 278 : if (session_replication_state->acquired_by == MyProcPid)
1088 : : {
2951 1089 : 268 : cv = &session_replication_state->origin_cv;
1090 : :
3783 andres@anarazel.de 1091 : 268 : session_replication_state->acquired_by = 0;
1092 : 268 : session_replication_state = NULL;
1093 : : }
1094 : :
1095 : 278 : LWLockRelease(ReplicationOriginLock);
1096 : :
2951 alvherre@alvh.no-ip. 1097 [ + + ]: 278 : if (cv)
1098 : 268 : ConditionVariableBroadcast(cv);
1099 : : }
1100 : :
1101 : : /*
1102 : : * Setup a replication origin in the shared memory struct if it doesn't
1103 : : * already exist and cache access to the specific ReplicationSlot so the
1104 : : * array doesn't have to be searched when calling
1105 : : * replorigin_session_advance().
1106 : : *
1107 : : * Normally only one such cached origin can exist per process so the cached
1108 : : * value can only be set again after the previous value is torn down with
1109 : : * replorigin_session_reset(). For this normal case pass acquired_by = 0
1110 : : * (meaning the slot is not allowed to be already acquired by another process).
1111 : : *
1112 : : * However, sometimes multiple processes can safely re-use the same origin slot
1113 : : * (for example, multiple parallel apply processes can safely use the same
1114 : : * origin, provided they maintain commit order by allowing only one process to
1115 : : * commit at a time). For this case the first process must pass acquired_by =
1116 : : * 0, and then the other processes sharing that same origin can pass
1117 : : * acquired_by = PID of the first process.
1118 : : */
1119 : : void
971 akapila@postgresql.o 1120 : 466 : replorigin_session_setup(RepOriginId node, int acquired_by)
1121 : : {
1122 : : static bool registered_cleanup;
1123 : : int i;
3759 bruce@momjian.us 1124 : 466 : int free_slot = -1;
1125 : :
3783 andres@anarazel.de 1126 [ + + ]: 466 : if (!registered_cleanup)
1127 : : {
1128 : 463 : on_shmem_exit(ReplicationOriginExitCleanup, 0);
1129 : 463 : registered_cleanup = true;
1130 : : }
1131 : :
169 msawada@postgresql.o 1132 [ - + ]: 466 : Assert(max_active_replication_origins > 0);
1133 : :
3783 andres@anarazel.de 1134 [ + + ]: 466 : if (session_replication_state != NULL)
1135 [ + - ]: 1 : ereport(ERROR,
1136 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1137 : : errmsg("cannot setup replication origin when one is already setup")));
1138 : :
1139 : : /* Lock exclusively, as we may have to create a new table entry. */
1140 : 465 : LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1141 : :
1142 : : /*
1143 : : * Search for either an existing slot for the origin, or a free one we can
1144 : : * use.
1145 : : */
169 msawada@postgresql.o 1146 [ + + ]: 1885 : for (i = 0; i < max_active_replication_origins; i++)
1147 : : {
3783 andres@anarazel.de 1148 : 1771 : ReplicationState *curstate = &replication_states[i];
1149 : :
1150 : : /* remember where to insert if necessary */
1151 [ + + + + ]: 1771 : if (curstate->roident == InvalidRepOriginId &&
1152 : : free_slot == -1)
1153 : : {
1154 : 117 : free_slot = i;
1155 : 117 : continue;
1156 : : }
1157 : :
1158 : : /* not our slot */
1159 [ + + ]: 1654 : if (curstate->roident != node)
1160 : 1303 : continue;
1161 : :
971 akapila@postgresql.o 1162 [ + + - + ]: 351 : else if (curstate->acquired_by != 0 && acquired_by == 0)
1163 : : {
3783 andres@anarazel.de 1164 [ # # ]:UBC 0 : ereport(ERROR,
1165 : : (errcode(ERRCODE_OBJECT_IN_USE),
1166 : : errmsg("replication origin with ID %d is already active for PID %d",
1167 : : curstate->roident, curstate->acquired_by)));
1168 : : }
1169 : :
1170 : : /* ok, found slot */
3783 andres@anarazel.de 1171 :CBC 351 : session_replication_state = curstate;
654 akapila@postgresql.o 1172 : 351 : break;
1173 : : }
1174 : :
1175 : :
3783 andres@anarazel.de 1176 [ + + - + ]: 465 : if (session_replication_state == NULL && free_slot == -1)
3783 andres@anarazel.de 1177 [ # # ]:UBC 0 : ereport(ERROR,
1178 : : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
1179 : : errmsg("could not find free replication state slot for replication origin with ID %d",
1180 : : node),
1181 : : errhint("Increase \"max_active_replication_origins\" and try again.")));
3783 andres@anarazel.de 1182 [ + + ]:CBC 465 : else if (session_replication_state == NULL)
1183 : : {
1184 : : /* initialize new slot */
1185 : 114 : session_replication_state = &replication_states[free_slot];
1186 [ - + ]: 114 : Assert(session_replication_state->remote_lsn == InvalidXLogRecPtr);
1187 [ - + ]: 114 : Assert(session_replication_state->local_lsn == InvalidXLogRecPtr);
1188 : 114 : session_replication_state->roident = node;
1189 : : }
1190 : :
1191 : :
1192 [ - + ]: 465 : Assert(session_replication_state->roident != InvalidRepOriginId);
1193 : :
971 akapila@postgresql.o 1194 [ + + ]: 465 : if (acquired_by == 0)
1195 : 455 : session_replication_state->acquired_by = MyProcPid;
1196 [ - + ]: 10 : else if (session_replication_state->acquired_by != acquired_by)
971 akapila@postgresql.o 1197 [ # # ]:UBC 0 : elog(ERROR, "could not find replication state slot for replication origin with OID %u which was acquired by %d",
1198 : : node, acquired_by);
1199 : :
3783 andres@anarazel.de 1200 :CBC 465 : LWLockRelease(ReplicationOriginLock);
1201 : :
1202 : : /* probably this one is pointless */
2951 alvherre@alvh.no-ip. 1203 : 465 : ConditionVariableBroadcast(&session_replication_state->origin_cv);
3783 andres@anarazel.de 1204 : 465 : }
1205 : :
1206 : : /*
1207 : : * Reset replay state previously setup in this session.
1208 : : *
1209 : : * This function may only be called if an origin was setup with
1210 : : * replorigin_session_setup().
1211 : : */
1212 : : void
1213 : 188 : replorigin_session_reset(void)
1214 : : {
1215 : : ConditionVariable *cv;
1216 : :
169 msawada@postgresql.o 1217 [ - + ]: 188 : Assert(max_active_replication_origins != 0);
1218 : :
3783 andres@anarazel.de 1219 [ + + ]: 188 : if (session_replication_state == NULL)
1220 [ + - ]: 1 : ereport(ERROR,
1221 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1222 : : errmsg("no replication origin is configured")));
1223 : :
1224 : 187 : LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1225 : :
1226 : 187 : session_replication_state->acquired_by = 0;
2951 alvherre@alvh.no-ip. 1227 : 187 : cv = &session_replication_state->origin_cv;
3783 andres@anarazel.de 1228 : 187 : session_replication_state = NULL;
1229 : :
1230 : 187 : LWLockRelease(ReplicationOriginLock);
1231 : :
2951 alvherre@alvh.no-ip. 1232 : 187 : ConditionVariableBroadcast(cv);
3783 andres@anarazel.de 1233 : 187 : }
1234 : :
1235 : : /*
1236 : : * Do the same work replorigin_advance() does, just on the session's
1237 : : * configured origin.
1238 : : *
1239 : : * This is noticeably cheaper than using replorigin_advance().
1240 : : */
1241 : : void
1242 : 1087 : replorigin_session_advance(XLogRecPtr remote_commit, XLogRecPtr local_commit)
1243 : : {
1244 [ - + ]: 1087 : Assert(session_replication_state != NULL);
1245 [ - + ]: 1087 : Assert(session_replication_state->roident != InvalidRepOriginId);
1246 : :
1247 : 1087 : LWLockAcquire(&session_replication_state->lock, LW_EXCLUSIVE);
1248 [ + - ]: 1087 : if (session_replication_state->local_lsn < local_commit)
1249 : 1087 : session_replication_state->local_lsn = local_commit;
1250 [ + + ]: 1087 : if (session_replication_state->remote_lsn < remote_commit)
1251 : 509 : session_replication_state->remote_lsn = remote_commit;
1252 : 1087 : LWLockRelease(&session_replication_state->lock);
1253 : 1087 : }
1254 : :
1255 : : /*
1256 : : * Ask the machinery about the point up to which we successfully replayed
1257 : : * changes from an already setup replication origin.
1258 : : */
1259 : : XLogRecPtr
1260 : 259 : replorigin_session_get_progress(bool flush)
1261 : : {
1262 : : XLogRecPtr remote_lsn;
1263 : : XLogRecPtr local_lsn;
1264 : :
1265 [ - + ]: 259 : Assert(session_replication_state != NULL);
1266 : :
1267 : 259 : LWLockAcquire(&session_replication_state->lock, LW_SHARED);
1268 : 259 : remote_lsn = session_replication_state->remote_lsn;
1269 : 259 : local_lsn = session_replication_state->local_lsn;
1270 : 259 : LWLockRelease(&session_replication_state->lock);
1271 : :
1272 [ + + + - ]: 259 : if (flush && local_lsn != InvalidXLogRecPtr)
1273 : 1 : XLogFlush(local_lsn);
1274 : :
1275 : 259 : return remote_lsn;
1276 : : }
1277 : :
1278 : :
1279 : :
1280 : : /* ---------------------------------------------------------------------------
1281 : : * SQL functions for working with replication origin.
1282 : : *
1283 : : * These mostly should be fairly short wrappers around more generic functions.
1284 : : * ---------------------------------------------------------------------------
1285 : : */
1286 : :
1287 : : /*
1288 : : * Create replication origin for the passed in name, and return the assigned
1289 : : * oid.
1290 : : */
1291 : : Datum
1292 : 12 : pg_replication_origin_create(PG_FUNCTION_ARGS)
1293 : : {
1294 : : char *name;
1295 : : RepOriginId roident;
1296 : :
1297 : 12 : replorigin_check_prerequisites(false, false);
1298 : :
1299 : 12 : name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1300 : :
1301 : : /*
1302 : : * Replication origins "any and "none" are reserved for system options.
1303 : : * The origins "pg_xxx" are reserved for internal use.
1304 : : */
1143 akapila@postgresql.o 1305 [ + + + + ]: 12 : if (IsReservedName(name) || IsReservedOriginName(name))
2261 tgl@sss.pgh.pa.us 1306 [ + - ]: 3 : ereport(ERROR,
1307 : : (errcode(ERRCODE_RESERVED_NAME),
1308 : : errmsg("replication origin name \"%s\" is reserved",
1309 : : name),
1310 : : errdetail("Origin names \"%s\", \"%s\", and names starting with \"pg_\" are reserved.",
1311 : : LOGICALREP_ORIGIN_ANY, LOGICALREP_ORIGIN_NONE)));
1312 : :
1313 : : /*
1314 : : * If built with appropriate switch, whine when regression-testing
1315 : : * conventions for replication origin names are violated.
1316 : : */
1317 : : #ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
1318 : : if (strncmp(name, "regress_", 8) != 0)
1319 : : elog(WARNING, "replication origins created by regression test cases should have names starting with \"regress_\"");
1320 : : #endif
1321 : :
3783 andres@anarazel.de 1322 : 9 : roident = replorigin_create(name);
1323 : :
1324 : 5 : pfree(name);
1325 : :
1326 : 5 : PG_RETURN_OID(roident);
1327 : : }
1328 : :
1329 : : /*
1330 : : * Drop replication origin.
1331 : : */
1332 : : Datum
1333 : 7 : pg_replication_origin_drop(PG_FUNCTION_ARGS)
1334 : : {
1335 : : char *name;
1336 : :
1337 : 7 : replorigin_check_prerequisites(false, false);
1338 : :
1339 : 7 : name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1340 : :
1669 akapila@postgresql.o 1341 : 7 : replorigin_drop_by_name(name, false, true);
1342 : :
3783 andres@anarazel.de 1343 : 6 : pfree(name);
1344 : :
1345 : 6 : PG_RETURN_VOID();
1346 : : }
1347 : :
1348 : : /*
1349 : : * Return oid of a replication origin.
1350 : : */
1351 : : Datum
3783 andres@anarazel.de 1352 :UBC 0 : pg_replication_origin_oid(PG_FUNCTION_ARGS)
1353 : : {
1354 : : char *name;
1355 : : RepOriginId roident;
1356 : :
1357 : 0 : replorigin_check_prerequisites(false, false);
1358 : :
1359 : 0 : name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1360 : 0 : roident = replorigin_by_name(name, true);
1361 : :
1362 : 0 : pfree(name);
1363 : :
1364 [ # # ]: 0 : if (OidIsValid(roident))
1365 : 0 : PG_RETURN_OID(roident);
1366 : 0 : PG_RETURN_NULL();
1367 : : }
1368 : :
1369 : : /*
1370 : : * Setup a replication origin for this session.
1371 : : */
1372 : : Datum
3783 andres@anarazel.de 1373 :CBC 6 : pg_replication_origin_session_setup(PG_FUNCTION_ARGS)
1374 : : {
1375 : : char *name;
1376 : : RepOriginId origin;
1377 : :
1378 : 6 : replorigin_check_prerequisites(true, false);
1379 : :
1380 : 6 : name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1381 : 6 : origin = replorigin_by_name(name, false);
971 akapila@postgresql.o 1382 : 5 : replorigin_session_setup(origin, 0);
1383 : :
3631 alvherre@alvh.no-ip. 1384 : 4 : replorigin_session_origin = origin;
1385 : :
3783 andres@anarazel.de 1386 : 4 : pfree(name);
1387 : :
1388 : 4 : PG_RETURN_VOID();
1389 : : }
1390 : :
1391 : : /*
1392 : : * Reset previously setup origin in this session
1393 : : */
1394 : : Datum
1395 : 5 : pg_replication_origin_session_reset(PG_FUNCTION_ARGS)
1396 : : {
1397 : 5 : replorigin_check_prerequisites(true, false);
1398 : :
1399 : 5 : replorigin_session_reset();
1400 : :
3631 alvherre@alvh.no-ip. 1401 : 4 : replorigin_session_origin = InvalidRepOriginId;
1402 : 4 : replorigin_session_origin_lsn = InvalidXLogRecPtr;
1403 : 4 : replorigin_session_origin_timestamp = 0;
1404 : :
3783 andres@anarazel.de 1405 : 4 : PG_RETURN_VOID();
1406 : : }
1407 : :
1408 : : /*
1409 : : * Has a replication origin been setup for this session.
1410 : : */
1411 : : Datum
3783 andres@anarazel.de 1412 :UBC 0 : pg_replication_origin_session_is_setup(PG_FUNCTION_ARGS)
1413 : : {
1414 : 0 : replorigin_check_prerequisites(false, false);
1415 : :
3631 alvherre@alvh.no-ip. 1416 : 0 : PG_RETURN_BOOL(replorigin_session_origin != InvalidRepOriginId);
1417 : : }
1418 : :
1419 : :
1420 : : /*
1421 : : * Return the replication progress for origin setup in the current session.
1422 : : *
1423 : : * If 'flush' is set to true it is ensured that the returned value corresponds
1424 : : * to a local transaction that has been flushed. This is useful if asynchronous
1425 : : * commits are used when replaying replicated transactions.
1426 : : */
1427 : : Datum
3783 andres@anarazel.de 1428 :CBC 2 : pg_replication_origin_session_progress(PG_FUNCTION_ARGS)
1429 : : {
1430 : 2 : XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1431 : 2 : bool flush = PG_GETARG_BOOL(0);
1432 : :
1433 : 2 : replorigin_check_prerequisites(true, false);
1434 : :
1435 [ - + ]: 2 : if (session_replication_state == NULL)
3783 andres@anarazel.de 1436 [ # # ]:UBC 0 : ereport(ERROR,
1437 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1438 : : errmsg("no replication origin is configured")));
1439 : :
3783 andres@anarazel.de 1440 :CBC 2 : remote_lsn = replorigin_session_get_progress(flush);
1441 : :
1442 [ - + ]: 2 : if (remote_lsn == InvalidXLogRecPtr)
3783 andres@anarazel.de 1443 :UBC 0 : PG_RETURN_NULL();
1444 : :
3783 andres@anarazel.de 1445 :CBC 2 : PG_RETURN_LSN(remote_lsn);
1446 : : }
1447 : :
1448 : : Datum
1449 : 1 : pg_replication_origin_xact_setup(PG_FUNCTION_ARGS)
1450 : : {
1451 : 1 : XLogRecPtr location = PG_GETARG_LSN(0);
1452 : :
1453 : 1 : replorigin_check_prerequisites(true, false);
1454 : :
1455 [ - + ]: 1 : if (session_replication_state == NULL)
3783 andres@anarazel.de 1456 [ # # ]:UBC 0 : ereport(ERROR,
1457 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1458 : : errmsg("no replication origin is configured")));
1459 : :
3631 alvherre@alvh.no-ip. 1460 :CBC 1 : replorigin_session_origin_lsn = location;
1461 : 1 : replorigin_session_origin_timestamp = PG_GETARG_TIMESTAMPTZ(1);
1462 : :
3783 andres@anarazel.de 1463 : 1 : PG_RETURN_VOID();
1464 : : }
1465 : :
1466 : : Datum
3783 andres@anarazel.de 1467 :UBC 0 : pg_replication_origin_xact_reset(PG_FUNCTION_ARGS)
1468 : : {
1469 : 0 : replorigin_check_prerequisites(true, false);
1470 : :
3631 alvherre@alvh.no-ip. 1471 : 0 : replorigin_session_origin_lsn = InvalidXLogRecPtr;
1472 : 0 : replorigin_session_origin_timestamp = 0;
1473 : :
3783 andres@anarazel.de 1474 : 0 : PG_RETURN_VOID();
1475 : : }
1476 : :
1477 : :
1478 : : Datum
3783 andres@anarazel.de 1479 :CBC 3 : pg_replication_origin_advance(PG_FUNCTION_ARGS)
1480 : : {
3100 noah@leadboat.com 1481 : 3 : text *name = PG_GETARG_TEXT_PP(0);
3759 bruce@momjian.us 1482 : 3 : XLogRecPtr remote_commit = PG_GETARG_LSN(1);
1483 : : RepOriginId node;
1484 : :
3783 andres@anarazel.de 1485 : 3 : replorigin_check_prerequisites(true, false);
1486 : :
1487 : : /* lock to prevent the replication origin from vanishing */
1488 : 3 : LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1489 : :
1490 : 3 : node = replorigin_by_name(text_to_cstring(name), false);
1491 : :
1492 : : /*
1493 : : * Can't sensibly pass a local commit to be flushed at checkpoint - this
1494 : : * xact hasn't committed yet. This is why this function should be used to
1495 : : * set up the initial replication state, but not for replay.
1496 : : */
1497 : 2 : replorigin_advance(node, remote_commit, InvalidXLogRecPtr,
1498 : : true /* go backward */ , true /* WAL log */ );
1499 : :
1500 : 2 : UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1501 : :
1502 : 2 : PG_RETURN_VOID();
1503 : : }
1504 : :
1505 : :
1506 : : /*
1507 : : * Return the replication progress for an individual replication origin.
1508 : : *
1509 : : * If 'flush' is set to true it is ensured that the returned value corresponds
1510 : : * to a local transaction that has been flushed. This is useful if asynchronous
1511 : : * commits are used when replaying replicated transactions.
1512 : : */
1513 : : Datum
1514 : 3 : pg_replication_origin_progress(PG_FUNCTION_ARGS)
1515 : : {
1516 : : char *name;
1517 : : bool flush;
1518 : : RepOriginId roident;
1519 : 3 : XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1520 : :
1521 : 3 : replorigin_check_prerequisites(true, true);
1522 : :
1523 : 3 : name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1524 : 3 : flush = PG_GETARG_BOOL(1);
1525 : :
1526 : 3 : roident = replorigin_by_name(name, false);
1527 [ - + ]: 2 : Assert(OidIsValid(roident));
1528 : :
1529 : 2 : remote_lsn = replorigin_get_progress(roident, flush);
1530 : :
1531 [ - + ]: 2 : if (remote_lsn == InvalidXLogRecPtr)
3783 andres@anarazel.de 1532 :UBC 0 : PG_RETURN_NULL();
1533 : :
3783 andres@anarazel.de 1534 :CBC 2 : PG_RETURN_LSN(remote_lsn);
1535 : : }
1536 : :
1537 : :
1538 : : Datum
1539 : 8 : pg_show_replication_origin_status(PG_FUNCTION_ARGS)
1540 : : {
1541 : 8 : ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1542 : : int i;
1543 : : #define REPLICATION_ORIGIN_PROGRESS_COLS 4
1544 : :
1545 : : /* we want to return 0 rows if slot is set to zero */
1546 : 8 : replorigin_check_prerequisites(false, true);
1547 : :
1054 michael@paquier.xyz 1548 : 8 : InitMaterializedSRF(fcinfo, 0);
1549 : :
1550 : : /* prevent slots from being concurrently dropped */
3783 andres@anarazel.de 1551 : 8 : LWLockAcquire(ReplicationOriginLock, LW_SHARED);
1552 : :
1553 : : /*
1554 : : * Iterate through all possible replication_states, display if they are
1555 : : * filled. Note that we do not take any locks, so slightly corrupted/out
1556 : : * of date values are a possibility.
1557 : : */
169 msawada@postgresql.o 1558 [ + + ]: 88 : for (i = 0; i < max_active_replication_origins; i++)
1559 : : {
1560 : : ReplicationState *state;
1561 : : Datum values[REPLICATION_ORIGIN_PROGRESS_COLS];
1562 : : bool nulls[REPLICATION_ORIGIN_PROGRESS_COLS];
1563 : : char *roname;
1564 : :
3783 andres@anarazel.de 1565 : 80 : state = &replication_states[i];
1566 : :
1567 : : /* unused slot, nothing to display */
1568 [ + + ]: 80 : if (state->roident == InvalidRepOriginId)
1569 : 67 : continue;
1570 : :
1571 : 13 : memset(values, 0, sizeof(values));
1572 : 13 : memset(nulls, 1, sizeof(nulls));
1573 : :
1574 : 13 : values[0] = ObjectIdGetDatum(state->roident);
1575 : 13 : nulls[0] = false;
1576 : :
1577 : : /*
1578 : : * We're not preventing the origin to be dropped concurrently, so
1579 : : * silently accept that it might be gone.
1580 : : */
1581 [ + + ]: 13 : if (replorigin_by_oid(state->roident, true,
1582 : : &roname))
1583 : : {
1584 : 11 : values[1] = CStringGetTextDatum(roname);
1585 : 11 : nulls[1] = false;
1586 : : }
1587 : :
1588 : 13 : LWLockAcquire(&state->lock, LW_SHARED);
1589 : :
3759 bruce@momjian.us 1590 : 13 : values[2] = LSNGetDatum(state->remote_lsn);
3783 andres@anarazel.de 1591 : 13 : nulls[2] = false;
1592 : :
1593 : 13 : values[3] = LSNGetDatum(state->local_lsn);
1594 : 13 : nulls[3] = false;
1595 : :
1596 : 13 : LWLockRelease(&state->lock);
1597 : :
1279 michael@paquier.xyz 1598 : 13 : tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
1599 : : values, nulls);
1600 : : }
1601 : :
3783 andres@anarazel.de 1602 : 8 : LWLockRelease(ReplicationOriginLock);
1603 : :
1604 : : #undef REPLICATION_ORIGIN_PROGRESS_COLS
1605 : :
1606 : 8 : return (Datum) 0;
1607 : : }
|