Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * origin.c
4 : : * Logical replication progress tracking support.
5 : : *
6 : : * Copyright (c) 2013-2026, PostgreSQL Global Development Group
7 : : *
8 : : * IDENTIFICATION
9 : : * src/backend/replication/logical/origin.c
10 : : *
11 : : * NOTES
12 : : *
13 : : * This file provides the following:
14 : : * * An infrastructure to name nodes in a replication setup
15 : : * * A facility to efficiently store and persist replication progress in an
16 : : * efficient and durable manner.
17 : : *
18 : : * Replication origin consists of a descriptive, user defined, external
19 : : * name and a short, thus space efficient, internal 2 byte one. This split
20 : : * exists because replication origin have to be stored in WAL and shared
21 : : * memory and long descriptors would be inefficient. For now only use 2 bytes
22 : : * for the internal id of a replication origin as it seems unlikely that there
23 : : * soon will be more than 65k nodes in one replication setup; and using only
24 : : * two bytes allow us to be more space efficient.
25 : : *
26 : : * Replication progress is tracked in a shared memory table
27 : : * (ReplicationState) that's dumped to disk every checkpoint. Entries
28 : : * ('slots') in this table are identified by the internal id. That's the case
29 : : * because it allows to increase replication progress during crash
30 : : * recovery. To allow doing so we store the original LSN (from the originating
31 : : * system) of a transaction in the commit record. That allows to recover the
32 : : * precise replayed state after crash recovery; without requiring synchronous
33 : : * commits. Allowing logical replication to use asynchronous commit is
34 : : * generally good for performance, but especially important as it allows a
35 : : * single threaded replay process to keep up with a source that has multiple
36 : : * backends generating changes concurrently. For efficiency and simplicity
37 : : * reasons a backend can setup one replication origin that's from then used as
38 : : * the source of changes produced by the backend, until reset again.
39 : : *
40 : : * This infrastructure is intended to be used in cooperation with logical
41 : : * decoding. When replaying from a remote system the configured origin is
42 : : * provided to output plugins, allowing prevention of replication loops and
43 : : * other filtering.
44 : : *
45 : : * There are several levels of locking at work:
46 : : *
47 : : * * To create and drop replication origins an exclusive lock on
48 : : * pg_replication_slot is required for the duration. That allows us to
49 : : * safely and conflict free assign new origins using a dirty snapshot.
50 : : *
51 : : * * When creating an in-memory replication progress slot the ReplicationOrigin
52 : : * LWLock has to be held exclusively; when iterating over the replication
53 : : * progress a shared lock has to be held, the same when advancing the
54 : : * replication progress of an individual backend that has not setup as the
55 : : * session's replication origin.
56 : : *
57 : : * * When manipulating or looking at the remote_lsn and local_lsn fields of a
58 : : * replication progress slot that slot's lwlock has to be held. That's
59 : : * primarily because we do not assume 8 byte writes (the LSN) is atomic on
60 : : * all our platforms, but it also simplifies memory ordering concerns
61 : : * between the remote and local lsn. We use a lwlock instead of a spinlock
62 : : * so it's less harmful to hold the lock over a WAL write
63 : : * (cf. AdvanceReplicationProgress).
64 : : *
65 : : * ---------------------------------------------------------------------------
66 : : */
67 : :
68 : : #include "postgres.h"
69 : :
70 : : #include <unistd.h>
71 : : #include <sys/stat.h>
72 : :
73 : : #include "access/genam.h"
74 : : #include "access/htup_details.h"
75 : : #include "access/table.h"
76 : : #include "access/xact.h"
77 : : #include "access/xloginsert.h"
78 : : #include "catalog/catalog.h"
79 : : #include "catalog/indexing.h"
80 : : #include "catalog/pg_subscription.h"
81 : : #include "funcapi.h"
82 : : #include "miscadmin.h"
83 : : #include "nodes/execnodes.h"
84 : : #include "pgstat.h"
85 : : #include "replication/origin.h"
86 : : #include "replication/slot.h"
87 : : #include "storage/condition_variable.h"
88 : : #include "storage/fd.h"
89 : : #include "storage/ipc.h"
90 : : #include "storage/lmgr.h"
91 : : #include "utils/builtins.h"
92 : : #include "utils/fmgroids.h"
93 : : #include "utils/guc.h"
94 : : #include "utils/pg_lsn.h"
95 : : #include "utils/rel.h"
96 : : #include "utils/snapmgr.h"
97 : : #include "utils/syscache.h"
98 : : #include "utils/wait_event.h"
99 : :
100 : : /* paths for replication origin checkpoint files */
101 : : #define PG_REPLORIGIN_CHECKPOINT_FILENAME PG_LOGICAL_DIR "/replorigin_checkpoint"
102 : : #define PG_REPLORIGIN_CHECKPOINT_TMPFILE PG_REPLORIGIN_CHECKPOINT_FILENAME ".tmp"
103 : :
104 : : /* GUC variables */
105 : : int max_active_replication_origins = 10;
106 : :
107 : : /*
108 : : * Replay progress of a single remote node.
109 : : */
110 : : typedef struct ReplicationState
111 : : {
112 : : /*
113 : : * Local identifier for the remote node.
114 : : */
115 : : ReplOriginId roident;
116 : :
117 : : /*
118 : : * Location of the latest commit from the remote side.
119 : : */
120 : : XLogRecPtr remote_lsn;
121 : :
122 : : /*
123 : : * Remember the local lsn of the commit record so we can XLogFlush() to it
124 : : * during a checkpoint so we know the commit record actually is safe on
125 : : * disk.
126 : : */
127 : : XLogRecPtr local_lsn;
128 : :
129 : : /*
130 : : * PID of backend that's acquired slot, or 0 if none.
131 : : */
132 : : int acquired_by;
133 : :
134 : : /* Count of processes that are currently using this origin. */
135 : : int refcount;
136 : :
137 : : /*
138 : : * Condition variable that's signaled when acquired_by changes.
139 : : */
140 : : ConditionVariable origin_cv;
141 : :
142 : : /*
143 : : * Lock protecting remote_lsn and local_lsn.
144 : : */
145 : : LWLock lock;
146 : : } ReplicationState;
147 : :
148 : : /*
149 : : * On disk version of ReplicationState.
150 : : */
151 : : typedef struct ReplicationStateOnDisk
152 : : {
153 : : ReplOriginId roident;
154 : : XLogRecPtr remote_lsn;
155 : : } ReplicationStateOnDisk;
156 : :
157 : :
158 : : typedef struct ReplicationStateCtl
159 : : {
160 : : /* Tranche to use for per-origin LWLocks */
161 : : int tranche_id;
162 : : /* Array of length max_active_replication_origins */
163 : : ReplicationState states[FLEXIBLE_ARRAY_MEMBER];
164 : : } ReplicationStateCtl;
165 : :
166 : : /* Global variable for per-transaction replication origin state */
167 : : ReplOriginXactState replorigin_xact_state = {
168 : : .origin = InvalidReplOriginId, /* assumed identity */
169 : : .origin_lsn = InvalidXLogRecPtr,
170 : : .origin_timestamp = 0
171 : : };
172 : :
173 : : /*
174 : : * Base address into a shared memory array of replication states of size
175 : : * max_active_replication_origins.
176 : : */
177 : : static ReplicationState *replication_states;
178 : :
179 : : /*
180 : : * Actual shared memory block (replication_states[] is now part of this).
181 : : */
182 : : static ReplicationStateCtl *replication_states_ctl;
183 : :
184 : : /*
185 : : * We keep a pointer to this backend's ReplicationState to avoid having to
186 : : * search the replication_states array in replorigin_session_advance for each
187 : : * remote commit. (Ownership of a backend's own entry can only be changed by
188 : : * that backend.)
189 : : */
190 : : static ReplicationState *session_replication_state = NULL;
191 : :
192 : : /* Magic for on disk files. */
193 : : #define REPLICATION_STATE_MAGIC ((uint32) 0x1257DADE)
194 : :
195 : : static void
359 msawada@postgresql.o 196 :CBC 68 : replorigin_check_prerequisites(bool check_origins, bool recoveryOK)
197 : : {
198 [ + + - + ]: 68 : if (check_origins && max_active_replication_origins == 0)
3973 andres@anarazel.de 199 [ # # ]:UBC 0 : ereport(ERROR,
200 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
201 : : errmsg("cannot query or manipulate replication origin when \"max_active_replication_origins\" is 0")));
202 : :
3973 andres@anarazel.de 203 [ + + - + ]:CBC 68 : if (!recoveryOK && RecoveryInProgress())
3973 andres@anarazel.de 204 [ # # ]:UBC 0 : ereport(ERROR,
205 : : (errcode(ERRCODE_READ_ONLY_SQL_TRANSACTION),
206 : : errmsg("cannot manipulate replication origins during recovery")));
3973 andres@anarazel.de 207 :CBC 68 : }
208 : :
209 : :
210 : : /*
211 : : * IsReservedOriginName
212 : : * True iff name is either "none" or "any".
213 : : */
214 : : static bool
1333 akapila@postgresql.o 215 : 13 : IsReservedOriginName(const char *name)
216 : : {
217 [ + + + + ]: 25 : return ((pg_strcasecmp(name, LOGICALREP_ORIGIN_NONE) == 0) ||
218 : 12 : (pg_strcasecmp(name, LOGICALREP_ORIGIN_ANY) == 0));
219 : : }
220 : :
221 : : /* ---------------------------------------------------------------------------
222 : : * Functions for working with replication origins themselves.
223 : : * ---------------------------------------------------------------------------
224 : : */
225 : :
226 : : /*
227 : : * Check for a persistent replication origin identified by name.
228 : : *
229 : : * Returns InvalidOid if the node isn't known yet and missing_ok is true.
230 : : */
231 : : ReplOriginId
1739 peter@eisentraut.org 232 : 1046 : replorigin_by_name(const char *roname, bool missing_ok)
233 : : {
234 : : Form_pg_replication_origin ident;
3949 bruce@momjian.us 235 : 1046 : Oid roident = InvalidOid;
236 : : HeapTuple tuple;
237 : : Datum roname_d;
238 : :
3973 andres@anarazel.de 239 : 1046 : roname_d = CStringGetTextDatum(roname);
240 : :
241 : 1046 : tuple = SearchSysCache1(REPLORIGNAME, roname_d);
242 [ + + ]: 1046 : if (HeapTupleIsValid(tuple))
243 : : {
244 : 653 : ident = (Form_pg_replication_origin) GETSTRUCT(tuple);
245 : 653 : roident = ident->roident;
246 : 653 : ReleaseSysCache(tuple);
247 : : }
248 [ + + ]: 393 : else if (!missing_ok)
3083 rhaas@postgresql.org 249 [ + - ]: 4 : ereport(ERROR,
250 : : (errcode(ERRCODE_UNDEFINED_OBJECT),
251 : : errmsg("replication origin \"%s\" does not exist",
252 : : roname)));
253 : :
3973 andres@anarazel.de 254 : 1042 : return roident;
255 : : }
256 : :
257 : : /*
258 : : * Create a replication origin.
259 : : *
260 : : * Needs to be called in a transaction.
261 : : */
262 : : ReplOriginId
1739 peter@eisentraut.org 263 : 388 : replorigin_create(const char *roname)
264 : : {
265 : : Oid roident;
3949 bruce@momjian.us 266 : 388 : HeapTuple tuple = NULL;
267 : : Relation rel;
268 : : Datum roname_d;
269 : : SnapshotData SnapshotDirty;
270 : : SysScanDesc scan;
271 : : ScanKeyData key;
272 : :
273 : : /*
274 : : * To avoid needing a TOAST table for pg_replication_origin, we limit
275 : : * replication origin names to 512 bytes. This should be more than enough
276 : : * for all practical use.
277 : : */
312 nathan@postgresql.or 278 [ + + ]: 388 : if (strlen(roname) > MAX_RONAME_LEN)
279 [ + - ]: 3 : ereport(ERROR,
280 : : (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
281 : : errmsg("replication origin name is too long"),
282 : : errdetail("Replication origin names must be no longer than %d bytes.",
283 : : MAX_RONAME_LEN)));
284 : :
3973 andres@anarazel.de 285 : 385 : roname_d = CStringGetTextDatum(roname);
286 : :
287 [ - + ]: 385 : Assert(IsTransactionState());
288 : :
289 : : /*
290 : : * We need the numeric replication origin to be 16bit wide, so we cannot
291 : : * rely on the normal oid allocation. Instead we simply scan
292 : : * pg_replication_origin for the first unused id. That's not particularly
293 : : * efficient, but this should be a fairly infrequent operation - we can
294 : : * easily spend a bit more code on this when it turns out it needs to be
295 : : * faster.
296 : : *
297 : : * We handle concurrency by taking an exclusive lock (allowing reads!)
298 : : * over the table for the duration of the search. Because we use a "dirty
299 : : * snapshot" we can read rows that other in-progress sessions have
300 : : * written, even though they would be invisible with normal snapshots. Due
301 : : * to the exclusive lock there's no danger that new rows can appear while
302 : : * we're checking.
303 : : */
304 : 385 : InitDirtySnapshot(SnapshotDirty);
305 : :
2610 306 : 385 : rel = table_open(ReplicationOriginRelationId, ExclusiveLock);
307 : :
308 : : /*
309 : : * We want to be able to access pg_replication_origin without setting up a
310 : : * snapshot. To make that safe, it needs to not have a TOAST table, since
311 : : * TOASTed data cannot be fetched without a snapshot. As of this writing,
312 : : * its only varlena column is roname, which we limit to 512 bytes to avoid
313 : : * needing out-of-line storage. If you add a TOAST table to this catalog,
314 : : * be sure to set up a snapshot everywhere it might be needed. For more
315 : : * information, see https://postgr.es/m/ZvMSUPOqUU-VNADN%40nathan.
316 : : */
312 nathan@postgresql.or 317 [ - + ]: 385 : Assert(!OidIsValid(rel->rd_rel->reltoastrelid));
318 : :
3972 andres@anarazel.de 319 [ + - ]: 700 : for (roident = InvalidOid + 1; roident < PG_UINT16_MAX; roident++)
320 : : {
321 : : bool nulls[Natts_pg_replication_origin];
322 : : Datum values[Natts_pg_replication_origin];
323 : : bool collides;
324 : :
3973 325 [ - + ]: 700 : CHECK_FOR_INTERRUPTS();
326 : :
327 : 700 : ScanKeyInit(&key,
328 : : Anum_pg_replication_origin_roident,
329 : : BTEqualStrategyNumber, F_OIDEQ,
330 : : ObjectIdGetDatum(roident));
331 : :
332 : 700 : scan = systable_beginscan(rel, ReplicationOriginIdentIndex,
333 : : true /* indexOK */ ,
334 : : &SnapshotDirty,
335 : : 1, &key);
336 : :
337 : 700 : collides = HeapTupleIsValid(systable_getnext(scan));
338 : :
339 : 700 : systable_endscan(scan);
340 : :
341 [ + + ]: 700 : if (!collides)
342 : : {
343 : : /*
344 : : * Ok, found an unused roident, insert the new row and do a CCI,
345 : : * so our callers can look it up if they want to.
346 : : */
347 : 385 : memset(&nulls, 0, sizeof(nulls));
348 : :
3949 bruce@momjian.us 349 : 385 : values[Anum_pg_replication_origin_roident - 1] = ObjectIdGetDatum(roident);
3973 andres@anarazel.de 350 : 385 : values[Anum_pg_replication_origin_roname - 1] = roname_d;
351 : :
352 : 385 : tuple = heap_form_tuple(RelationGetDescr(rel), values, nulls);
3330 alvherre@alvh.no-ip. 353 : 385 : CatalogTupleInsert(rel, tuple);
3973 andres@anarazel.de 354 : 384 : CommandCounterIncrement();
355 : 384 : break;
356 : : }
357 : : }
358 : :
359 : : /* now release lock again, */
2610 360 : 384 : table_close(rel, ExclusiveLock);
361 : :
3973 362 [ - + ]: 384 : if (tuple == NULL)
3973 andres@anarazel.de 363 [ # # ]:UBC 0 : ereport(ERROR,
364 : : (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
365 : : errmsg("could not find free replication origin ID")));
366 : :
3973 andres@anarazel.de 367 :CBC 384 : heap_freetuple(tuple);
368 : 384 : return roident;
369 : : }
370 : :
371 : : /*
372 : : * Helper function to drop a replication origin.
373 : : */
374 : : static void
46 msawada@postgresql.o 375 :GNC 332 : replorigin_state_clear(ReplOriginId roident, bool nowait)
376 : : {
377 : : int i;
378 : :
379 : : /*
380 : : * Clean up the slot state info, if there is any matching slot.
381 : : */
3141 alvherre@alvh.no-ip. 382 :CBC 336 : restart:
3973 andres@anarazel.de 383 : 336 : LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
384 : :
359 msawada@postgresql.o 385 [ + + ]: 1135 : for (i = 0; i < max_active_replication_origins; i++)
386 : : {
3973 andres@anarazel.de 387 : 1082 : ReplicationState *state = &replication_states[i];
388 : :
389 [ + + ]: 1082 : if (state->roident == roident)
390 : : {
391 : : /* found our slot, is it busy? */
60 akapila@postgresql.o 392 [ + + ]:GNC 283 : if (state->refcount > 0)
393 : : {
394 : : ConditionVariable *cv;
395 : :
3141 alvherre@alvh.no-ip. 396 [ - + ]:GBC 4 : if (nowait)
3141 alvherre@alvh.no-ip. 397 [ # # # # ]:UBC 0 : ereport(ERROR,
398 : : (errcode(ERRCODE_OBJECT_IN_USE),
399 : : (state->acquired_by != 0)
400 : : ? errmsg("could not drop replication origin with ID %d, in use by PID %d",
401 : : state->roident,
402 : : state->acquired_by)
403 : : : errmsg("could not drop replication origin with ID %d, in use by another process",
404 : : state->roident)));
405 : :
406 : : /*
407 : : * We must wait and then retry. Since we don't know which CV
408 : : * to wait on until here, we can't readily use
409 : : * ConditionVariablePrepareToSleep (calling it here would be
410 : : * wrong, since we could miss the signal if we did so); just
411 : : * use ConditionVariableSleep directly.
412 : : */
3141 alvherre@alvh.no-ip. 413 :GBC 4 : cv = &state->origin_cv;
414 : :
415 : 4 : LWLockRelease(ReplicationOriginLock);
416 : :
417 : 4 : ConditionVariableSleep(cv, WAIT_EVENT_REPLICATION_ORIGIN_DROP);
418 : 4 : goto restart;
419 : : }
420 : :
421 : : /* first make a WAL log entry */
422 : : {
423 : : xl_replorigin_drop xlrec;
424 : :
3973 andres@anarazel.de 425 :CBC 279 : xlrec.node_id = roident;
426 : 279 : XLogBeginInsert();
397 peter@eisentraut.org 427 : 279 : XLogRegisterData(&xlrec, sizeof(xlrec));
3973 andres@anarazel.de 428 : 279 : XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_DROP);
429 : : }
430 : :
431 : : /* then clear the in-memory slot */
46 msawada@postgresql.o 432 :GNC 279 : state->roident = InvalidReplOriginId;
3973 andres@anarazel.de 433 :CBC 279 : state->remote_lsn = InvalidXLogRecPtr;
434 : 279 : state->local_lsn = InvalidXLogRecPtr;
435 : 279 : break;
436 : : }
437 : : }
438 : 332 : LWLockRelease(ReplicationOriginLock);
2987 tgl@sss.pgh.pa.us 439 : 332 : ConditionVariableCancelSleep();
3973 andres@anarazel.de 440 : 332 : }
441 : :
442 : : /*
443 : : * Drop replication origin (by name).
444 : : *
445 : : * Needs to be called in a transaction.
446 : : */
447 : : void
1739 peter@eisentraut.org 448 : 524 : replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait)
449 : : {
450 : : ReplOriginId roident;
451 : : Relation rel;
452 : : HeapTuple tuple;
453 : :
1859 akapila@postgresql.o 454 [ - + ]: 524 : Assert(IsTransactionState());
455 : :
1136 456 : 524 : rel = table_open(ReplicationOriginRelationId, RowExclusiveLock);
457 : :
1859 458 : 524 : roident = replorigin_by_name(name, missing_ok);
459 : :
460 : : /* Lock the origin to prevent concurrent drops. */
1136 461 : 523 : LockSharedObject(ReplicationOriginRelationId, roident, 0,
462 : : AccessExclusiveLock);
463 : :
464 : 523 : tuple = SearchSysCache1(REPLORIGIDENT, ObjectIdGetDatum(roident));
465 [ + + ]: 523 : if (!HeapTupleIsValid(tuple))
466 : : {
467 [ - + ]: 191 : if (!missing_ok)
1136 akapila@postgresql.o 468 [ # # ]:UBC 0 : elog(ERROR, "cache lookup failed for replication origin with ID %d",
469 : : roident);
470 : :
471 : : /*
472 : : * We don't need to retain the locks if the origin is already dropped.
473 : : */
1136 akapila@postgresql.o 474 :CBC 191 : UnlockSharedObject(ReplicationOriginRelationId, roident, 0,
475 : : AccessExclusiveLock);
476 : 191 : table_close(rel, RowExclusiveLock);
477 : 191 : return;
478 : : }
479 : :
480 : 332 : replorigin_state_clear(roident, nowait);
481 : :
482 : : /*
483 : : * Now, we can delete the catalog entry.
484 : : */
485 : 332 : CatalogTupleDelete(rel, &tuple->t_self);
486 : 332 : ReleaseSysCache(tuple);
487 : :
488 : 332 : CommandCounterIncrement();
489 : :
490 : : /* We keep the lock on pg_replication_origin until commit */
1859 491 : 332 : table_close(rel, NoLock);
492 : : }
493 : :
494 : : /*
495 : : * Lookup replication origin via its oid and return the name.
496 : : *
497 : : * The external name is palloc'd in the calling context.
498 : : *
499 : : * Returns true if the origin is known, false otherwise.
500 : : */
501 : : bool
46 msawada@postgresql.o 502 :GNC 28 : replorigin_by_oid(ReplOriginId roident, bool missing_ok, char **roname)
503 : : {
504 : : HeapTuple tuple;
505 : : Form_pg_replication_origin ric;
506 : :
3973 andres@anarazel.de 507 [ - + ]:CBC 28 : Assert(OidIsValid((Oid) roident));
46 msawada@postgresql.o 508 [ - + ]:GNC 28 : Assert(roident != InvalidReplOriginId);
3973 andres@anarazel.de 509 [ - + ]:CBC 28 : Assert(roident != DoNotReplicateId);
510 : :
511 : 28 : tuple = SearchSysCache1(REPLORIGIDENT,
512 : : ObjectIdGetDatum((Oid) roident));
513 : :
514 [ + + ]: 28 : if (HeapTupleIsValid(tuple))
515 : : {
516 : 25 : ric = (Form_pg_replication_origin) GETSTRUCT(tuple);
517 : 25 : *roname = text_to_cstring(&ric->roname);
518 : 25 : ReleaseSysCache(tuple);
519 : :
520 : 25 : return true;
521 : : }
522 : : else
523 : : {
524 : 3 : *roname = NULL;
525 : :
526 [ - + ]: 3 : if (!missing_ok)
3083 rhaas@postgresql.org 527 [ # # ]:UBC 0 : ereport(ERROR,
528 : : (errcode(ERRCODE_UNDEFINED_OBJECT),
529 : : errmsg("replication origin with ID %d does not exist",
530 : : roident)));
531 : :
3973 andres@anarazel.de 532 :CBC 3 : return false;
533 : : }
534 : : }
535 : :
536 : :
537 : : /* ---------------------------------------------------------------------------
538 : : * Functions for handling replication progress.
539 : : * ---------------------------------------------------------------------------
540 : : */
541 : :
542 : : Size
543 : 4445 : ReplicationOriginShmemSize(void)
544 : : {
545 : 4445 : Size size = 0;
546 : :
359 msawada@postgresql.o 547 [ + + ]: 4445 : if (max_active_replication_origins == 0)
3973 andres@anarazel.de 548 : 2 : return size;
549 : :
550 : 4443 : size = add_size(size, offsetof(ReplicationStateCtl, states));
551 : :
552 : 4443 : size = add_size(size,
553 : : mul_size(max_active_replication_origins, sizeof(ReplicationState)));
554 : 4443 : return size;
555 : : }
556 : :
557 : : void
558 : 1150 : ReplicationOriginShmemInit(void)
559 : : {
560 : : bool found;
561 : :
359 msawada@postgresql.o 562 [ + + ]: 1150 : if (max_active_replication_origins == 0)
3973 andres@anarazel.de 563 : 1 : return;
564 : :
565 : 1149 : replication_states_ctl = (ReplicationStateCtl *)
566 : 1149 : ShmemInitStruct("ReplicationOriginState",
567 : : ReplicationOriginShmemSize(),
568 : : &found);
3949 bruce@momjian.us 569 : 1149 : replication_states = replication_states_ctl->states;
570 : :
3973 andres@anarazel.de 571 [ + - ]: 1149 : if (!found)
572 : : {
573 : : int i;
574 : :
2130 tgl@sss.pgh.pa.us 575 [ + - + - : 94146 : MemSet(replication_states_ctl, 0, ReplicationOriginShmemSize());
+ - + - +
+ ]
576 : :
577 : 1149 : replication_states_ctl->tranche_id = LWTRANCHE_REPLICATION_ORIGIN_STATE;
578 : :
359 msawada@postgresql.o 579 [ + + ]: 12630 : for (i = 0; i < max_active_replication_origins; i++)
580 : : {
3973 andres@anarazel.de 581 : 11481 : LWLockInitialize(&replication_states[i].lock,
582 : 11481 : replication_states_ctl->tranche_id);
3141 alvherre@alvh.no-ip. 583 : 11481 : ConditionVariableInit(&replication_states[i].origin_cv);
584 : : }
585 : : }
586 : : }
587 : :
588 : : /* ---------------------------------------------------------------------------
589 : : * Perform a checkpoint of each replication origin's progress with respect to
590 : : * the replayed remote_lsn. Make sure that all transactions we refer to in the
591 : : * checkpoint (local_lsn) are actually on-disk. This might not yet be the case
592 : : * if the transactions were originally committed asynchronously.
593 : : *
594 : : * We store checkpoints in the following format:
595 : : * +-------+------------------------+------------------+-----+--------+
596 : : * | MAGIC | ReplicationStateOnDisk | struct Replic... | ... | CRC32C | EOF
597 : : * +-------+------------------------+------------------+-----+--------+
598 : : *
599 : : * So its just the magic, followed by the statically sized
600 : : * ReplicationStateOnDisk structs. Note that the maximum number of
601 : : * ReplicationState is determined by max_active_replication_origins.
602 : : * ---------------------------------------------------------------------------
603 : : */
604 : : void
3973 andres@anarazel.de 605 : 1802 : CheckPointReplicationOrigin(void)
606 : : {
562 michael@paquier.xyz 607 : 1802 : const char *tmppath = PG_REPLORIGIN_CHECKPOINT_TMPFILE;
608 : 1802 : const char *path = PG_REPLORIGIN_CHECKPOINT_FILENAME;
609 : : int tmpfd;
610 : : int i;
3973 andres@anarazel.de 611 : 1802 : uint32 magic = REPLICATION_STATE_MAGIC;
612 : : pg_crc32c crc;
613 : :
359 msawada@postgresql.o 614 [ + + ]: 1802 : if (max_active_replication_origins == 0)
3973 andres@anarazel.de 615 : 1 : return;
616 : :
617 : 1801 : INIT_CRC32C(crc);
618 : :
619 : : /* make sure no old temp file is remaining */
620 [ + - - + ]: 1801 : if (unlink(tmppath) < 0 && errno != ENOENT)
3973 andres@anarazel.de 621 [ # # ]:UBC 0 : ereport(PANIC,
622 : : (errcode_for_file_access(),
623 : : errmsg("could not remove file \"%s\": %m",
624 : : tmppath)));
625 : :
626 : : /*
627 : : * no other backend can perform this at the same time; only one checkpoint
628 : : * can happen at a time.
629 : : */
3095 peter_e@gmx.net 630 :CBC 1801 : tmpfd = OpenTransientFile(tmppath,
631 : : O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
3973 andres@anarazel.de 632 [ - + ]: 1801 : if (tmpfd < 0)
3973 andres@anarazel.de 633 [ # # ]:UBC 0 : ereport(PANIC,
634 : : (errcode_for_file_access(),
635 : : errmsg("could not create file \"%s\": %m",
636 : : tmppath)));
637 : :
638 : : /* write magic */
2779 michael@paquier.xyz 639 :CBC 1801 : errno = 0;
3973 andres@anarazel.de 640 [ - + ]: 1801 : if ((write(tmpfd, &magic, sizeof(magic))) != sizeof(magic))
641 : : {
642 : : /* if write didn't set errno, assume problem is no disk space */
2524 michael@paquier.xyz 643 [ # # ]:UBC 0 : if (errno == 0)
644 : 0 : errno = ENOSPC;
3973 andres@anarazel.de 645 [ # # ]: 0 : ereport(PANIC,
646 : : (errcode_for_file_access(),
647 : : errmsg("could not write to file \"%s\": %m",
648 : : tmppath)));
649 : : }
3973 andres@anarazel.de 650 :CBC 1801 : COMP_CRC32C(crc, &magic, sizeof(magic));
651 : :
652 : : /* prevent concurrent creations/drops */
653 : 1801 : LWLockAcquire(ReplicationOriginLock, LW_SHARED);
654 : :
655 : : /* write actual data */
359 msawada@postgresql.o 656 [ + + ]: 19811 : for (i = 0; i < max_active_replication_origins; i++)
657 : : {
658 : : ReplicationStateOnDisk disk_state;
3973 andres@anarazel.de 659 : 18010 : ReplicationState *curstate = &replication_states[i];
660 : : XLogRecPtr local_lsn;
661 : :
46 msawada@postgresql.o 662 [ + + ]:GNC 18010 : if (curstate->roident == InvalidReplOriginId)
3973 andres@anarazel.de 663 :CBC 17957 : continue;
664 : :
665 : : /* zero, to avoid uninitialized padding bytes */
3248 666 : 53 : memset(&disk_state, 0, sizeof(disk_state));
667 : :
3973 668 : 53 : LWLockAcquire(&curstate->lock, LW_SHARED);
669 : :
670 : 53 : disk_state.roident = curstate->roident;
671 : :
672 : 53 : disk_state.remote_lsn = curstate->remote_lsn;
673 : 53 : local_lsn = curstate->local_lsn;
674 : :
675 : 53 : LWLockRelease(&curstate->lock);
676 : :
677 : : /* make sure we only write out a commit that's persistent */
678 : 53 : XLogFlush(local_lsn);
679 : :
2779 michael@paquier.xyz 680 : 53 : errno = 0;
3973 andres@anarazel.de 681 [ - + ]: 53 : if ((write(tmpfd, &disk_state, sizeof(disk_state))) !=
682 : : sizeof(disk_state))
683 : : {
684 : : /* if write didn't set errno, assume problem is no disk space */
2524 michael@paquier.xyz 685 [ # # ]:UBC 0 : if (errno == 0)
686 : 0 : errno = ENOSPC;
3973 andres@anarazel.de 687 [ # # ]: 0 : ereport(PANIC,
688 : : (errcode_for_file_access(),
689 : : errmsg("could not write to file \"%s\": %m",
690 : : tmppath)));
691 : : }
692 : :
3973 andres@anarazel.de 693 :CBC 53 : COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
694 : : }
695 : :
696 : 1801 : LWLockRelease(ReplicationOriginLock);
697 : :
698 : : /* write out the CRC */
699 : 1801 : FIN_CRC32C(crc);
2779 michael@paquier.xyz 700 : 1801 : errno = 0;
3973 andres@anarazel.de 701 [ - + ]: 1801 : if ((write(tmpfd, &crc, sizeof(crc))) != sizeof(crc))
702 : : {
703 : : /* if write didn't set errno, assume problem is no disk space */
2524 michael@paquier.xyz 704 [ # # ]:UBC 0 : if (errno == 0)
705 : 0 : errno = ENOSPC;
3973 andres@anarazel.de 706 [ # # ]: 0 : ereport(PANIC,
707 : : (errcode_for_file_access(),
708 : : errmsg("could not write to file \"%s\": %m",
709 : : tmppath)));
710 : : }
711 : :
2444 peter@eisentraut.org 712 [ - + ]:CBC 1801 : if (CloseTransientFile(tmpfd) != 0)
2563 michael@paquier.xyz 713 [ # # ]:UBC 0 : ereport(PANIC,
714 : : (errcode_for_file_access(),
715 : : errmsg("could not close file \"%s\": %m",
716 : : tmppath)));
717 : :
718 : : /* fsync, rename to permanent file, fsync file and directory */
3658 andres@anarazel.de 719 :CBC 1801 : durable_rename(tmppath, path, PANIC);
720 : : }
721 : :
722 : : /*
723 : : * Recover replication replay status from checkpoint data saved earlier by
724 : : * CheckPointReplicationOrigin.
725 : : *
726 : : * This only needs to be called at startup and *not* during every checkpoint
727 : : * read during recovery (e.g. in HS or PITR from a base backup) afterwards. All
728 : : * state thereafter can be recovered by looking at commit records.
729 : : */
730 : : void
3973 731 : 1000 : StartupReplicationOrigin(void)
732 : : {
562 michael@paquier.xyz 733 : 1000 : const char *path = PG_REPLORIGIN_CHECKPOINT_FILENAME;
734 : : int fd;
735 : : int readBytes;
3949 bruce@momjian.us 736 : 1000 : uint32 magic = REPLICATION_STATE_MAGIC;
737 : 1000 : int last_state = 0;
738 : : pg_crc32c file_crc;
739 : : pg_crc32c crc;
740 : :
741 : : /* don't want to overwrite already existing state */
742 : : #ifdef USE_ASSERT_CHECKING
743 : : static bool already_started = false;
744 : :
3973 andres@anarazel.de 745 [ - + ]: 1000 : Assert(!already_started);
746 : 1000 : already_started = true;
747 : : #endif
748 : :
359 msawada@postgresql.o 749 [ + + ]: 1000 : if (max_active_replication_origins == 0)
3973 andres@anarazel.de 750 : 52 : return;
751 : :
752 : 999 : INIT_CRC32C(crc);
753 : :
754 [ + + ]: 999 : elog(DEBUG2, "starting up replication origin progress state");
755 : :
3095 peter_e@gmx.net 756 : 999 : fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
757 : :
758 : : /*
759 : : * might have had max_active_replication_origins == 0 last run, or we just
760 : : * brought up a standby.
761 : : */
3973 andres@anarazel.de 762 [ + + + - ]: 999 : if (fd < 0 && errno == ENOENT)
763 : 51 : return;
764 [ - + ]: 948 : else if (fd < 0)
3973 andres@anarazel.de 765 [ # # ]:UBC 0 : ereport(PANIC,
766 : : (errcode_for_file_access(),
767 : : errmsg("could not open file \"%s\": %m",
768 : : path)));
769 : :
770 : : /* verify magic, that is written even if nothing was active */
3973 andres@anarazel.de 771 :CBC 948 : readBytes = read(fd, &magic, sizeof(magic));
772 [ - + ]: 948 : if (readBytes != sizeof(magic))
773 : : {
2797 michael@paquier.xyz 774 [ # # ]:UBC 0 : if (readBytes < 0)
775 [ # # ]: 0 : ereport(PANIC,
776 : : (errcode_for_file_access(),
777 : : errmsg("could not read file \"%s\": %m",
778 : : path)));
779 : : else
780 [ # # ]: 0 : ereport(PANIC,
781 : : (errcode(ERRCODE_DATA_CORRUPTED),
782 : : errmsg("could not read file \"%s\": read %d of %zu",
783 : : path, readBytes, sizeof(magic))));
784 : : }
3973 andres@anarazel.de 785 :CBC 948 : COMP_CRC32C(crc, &magic, sizeof(magic));
786 : :
787 [ - + ]: 948 : if (magic != REPLICATION_STATE_MAGIC)
3973 andres@anarazel.de 788 [ # # ]:UBC 0 : ereport(PANIC,
789 : : (errmsg("replication checkpoint has wrong magic %u instead of %u",
790 : : magic, REPLICATION_STATE_MAGIC)));
791 : :
792 : : /* we can skip locking here, no other access is possible */
793 : :
794 : : /* recover individual states, until there are no more to be found */
795 : : while (true)
3973 andres@anarazel.de 796 :CBC 31 : {
797 : : ReplicationStateOnDisk disk_state;
798 : :
799 : 979 : readBytes = read(fd, &disk_state, sizeof(disk_state));
800 : :
801 [ - + ]: 979 : if (readBytes < 0)
802 : : {
3973 andres@anarazel.de 803 [ # # ]:UBC 0 : ereport(PANIC,
804 : : (errcode_for_file_access(),
805 : : errmsg("could not read file \"%s\": %m",
806 : : path)));
807 : : }
808 : :
809 : : /* no further data */
104 peter@eisentraut.org 810 [ + + ]:GNC 979 : if (readBytes == sizeof(crc))
811 : : {
812 : 948 : memcpy(&file_crc, &disk_state, sizeof(file_crc));
813 : 948 : break;
814 : : }
815 : :
3973 andres@anarazel.de 816 [ - + ]:CBC 31 : if (readBytes != sizeof(disk_state))
817 : : {
3973 andres@anarazel.de 818 [ # # ]:UBC 0 : ereport(PANIC,
819 : : (errcode_for_file_access(),
820 : : errmsg("could not read file \"%s\": read %d of %zu",
821 : : path, readBytes, sizeof(disk_state))));
822 : : }
823 : :
3973 andres@anarazel.de 824 :CBC 31 : COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
825 : :
359 msawada@postgresql.o 826 [ - + ]: 31 : if (last_state == max_active_replication_origins)
3973 andres@anarazel.de 827 [ # # ]:UBC 0 : ereport(PANIC,
828 : : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
829 : : errmsg("could not find free replication state, increase \"max_active_replication_origins\"")));
830 : :
831 : : /* copy data to shared memory */
3973 andres@anarazel.de 832 :CBC 31 : replication_states[last_state].roident = disk_state.roident;
833 : 31 : replication_states[last_state].remote_lsn = disk_state.remote_lsn;
834 : 31 : last_state++;
835 : :
1927 peter@eisentraut.org 836 [ + - ]: 31 : ereport(LOG,
837 : : errmsg("recovered replication state of node %d to %X/%08X",
838 : : disk_state.roident,
839 : : LSN_FORMAT_ARGS(disk_state.remote_lsn)));
840 : : }
841 : :
842 : : /* now check checksum */
3973 andres@anarazel.de 843 : 948 : FIN_CRC32C(crc);
844 [ - + ]: 948 : if (file_crc != crc)
3973 andres@anarazel.de 845 [ # # ]:UBC 0 : ereport(PANIC,
846 : : (errcode(ERRCODE_DATA_CORRUPTED),
847 : : errmsg("replication slot checkpoint has wrong checksum %u, expected %u",
848 : : crc, file_crc)));
849 : :
2444 peter@eisentraut.org 850 [ - + ]:CBC 948 : if (CloseTransientFile(fd) != 0)
2563 michael@paquier.xyz 851 [ # # ]:UBC 0 : ereport(PANIC,
852 : : (errcode_for_file_access(),
853 : : errmsg("could not close file \"%s\": %m",
854 : : path)));
855 : : }
856 : :
857 : : void
3973 andres@anarazel.de 858 :CBC 4 : replorigin_redo(XLogReaderState *record)
859 : : {
860 : 4 : uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
861 : :
862 [ + + - ]: 4 : switch (info)
863 : : {
864 : 2 : case XLOG_REPLORIGIN_SET:
865 : : {
866 : 2 : xl_replorigin_set *xlrec =
1031 tgl@sss.pgh.pa.us 867 : 2 : (xl_replorigin_set *) XLogRecGetData(record);
868 : :
3973 andres@anarazel.de 869 : 2 : replorigin_advance(xlrec->node_id,
870 : : xlrec->remote_lsn, record->EndRecPtr,
3949 bruce@momjian.us 871 : 2 : xlrec->force /* backward */ ,
872 : : false /* WAL log */ );
3973 andres@anarazel.de 873 : 2 : break;
874 : : }
875 : 2 : case XLOG_REPLORIGIN_DROP:
876 : : {
877 : : xl_replorigin_drop *xlrec;
878 : : int i;
879 : :
880 : 2 : xlrec = (xl_replorigin_drop *) XLogRecGetData(record);
881 : :
359 msawada@postgresql.o 882 [ + - ]: 3 : for (i = 0; i < max_active_replication_origins; i++)
883 : : {
3973 andres@anarazel.de 884 : 3 : ReplicationState *state = &replication_states[i];
885 : :
886 : : /* found our slot */
887 [ + + ]: 3 : if (state->roident == xlrec->node_id)
888 : : {
889 : : /* reset entry */
46 msawada@postgresql.o 890 :GNC 2 : state->roident = InvalidReplOriginId;
3973 andres@anarazel.de 891 :CBC 2 : state->remote_lsn = InvalidXLogRecPtr;
892 : 2 : state->local_lsn = InvalidXLogRecPtr;
893 : 2 : break;
894 : : }
895 : : }
896 : 2 : break;
897 : : }
3973 andres@anarazel.de 898 :UBC 0 : default:
899 [ # # ]: 0 : elog(PANIC, "replorigin_redo: unknown op code %u", info);
900 : : }
3973 andres@anarazel.de 901 :CBC 4 : }
902 : :
903 : :
904 : : /*
905 : : * Tell the replication origin progress machinery that a commit from 'node'
906 : : * that originated at the LSN remote_commit on the remote node was replayed
907 : : * successfully and that we don't need to do so again. In combination with
908 : : * setting up replorigin_xact_state {.origin_lsn, .origin_timestamp}
909 : : * that ensures we won't lose knowledge about that after a crash if the
910 : : * transaction had a persistent effect (think of asynchronous commits).
911 : : *
912 : : * local_commit needs to be a local LSN of the commit so that we can make sure
913 : : * upon a checkpoint that enough WAL has been persisted to disk.
914 : : *
915 : : * Needs to be called with a RowExclusiveLock on pg_replication_origin,
916 : : * unless running in recovery.
917 : : */
918 : : void
46 msawada@postgresql.o 919 :GNC 250 : replorigin_advance(ReplOriginId node,
920 : : XLogRecPtr remote_commit, XLogRecPtr local_commit,
921 : : bool go_backward, bool wal_log)
922 : : {
923 : : int i;
3973 andres@anarazel.de 924 :CBC 250 : ReplicationState *replication_state = NULL;
925 : 250 : ReplicationState *free_state = NULL;
926 : :
46 msawada@postgresql.o 927 [ - + ]:GNC 250 : Assert(node != InvalidReplOriginId);
928 : :
929 : : /* we don't track DoNotReplicateId */
3973 andres@anarazel.de 930 [ - + ]:CBC 250 : if (node == DoNotReplicateId)
3973 andres@anarazel.de 931 :UBC 0 : return;
932 : :
933 : : /*
934 : : * XXX: For the case where this is called by WAL replay, it'd be more
935 : : * efficient to restore into a backend local hashtable and only dump into
936 : : * shmem after recovery is finished. Let's wait with implementing that
937 : : * till it's shown to be a measurable expense
938 : : */
939 : :
940 : : /* Lock exclusively, as we may have to create a new table entry. */
3973 andres@anarazel.de 941 :CBC 250 : LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
942 : :
943 : : /*
944 : : * Search for either an existing slot for the origin, or a free one we can
945 : : * use.
946 : : */
359 msawada@postgresql.o 947 [ + + ]: 2303 : for (i = 0; i < max_active_replication_origins; i++)
948 : : {
3973 andres@anarazel.de 949 : 2099 : ReplicationState *curstate = &replication_states[i];
950 : :
951 : : /* remember where to insert if necessary */
46 msawada@postgresql.o 952 [ + + + + ]:GNC 2099 : if (curstate->roident == InvalidReplOriginId &&
953 : : free_state == NULL)
954 : : {
3973 andres@anarazel.de 955 :CBC 204 : free_state = curstate;
956 : 204 : continue;
957 : : }
958 : :
959 : : /* not our slot */
960 [ + + ]: 1895 : if (curstate->roident != node)
961 : : {
962 : 1849 : continue;
963 : : }
964 : :
965 : : /* ok, found slot */
966 : 46 : replication_state = curstate;
967 : :
968 : 46 : LWLockAcquire(&replication_state->lock, LW_EXCLUSIVE);
969 : :
970 : : /* Make sure it's not used by somebody else */
60 akapila@postgresql.o 971 [ - + ]:GNC 46 : if (replication_state->refcount > 0)
972 : : {
3973 andres@anarazel.de 973 [ # # # # ]:UBC 0 : ereport(ERROR,
974 : : (errcode(ERRCODE_OBJECT_IN_USE),
975 : : (replication_state->acquired_by != 0)
976 : : ? errmsg("replication origin with ID %d is already active for PID %d",
977 : : replication_state->roident,
978 : : replication_state->acquired_by)
979 : : : errmsg("replication origin with ID %d is already active in another process",
980 : : replication_state->roident)));
981 : : }
982 : :
3973 andres@anarazel.de 983 :CBC 46 : break;
984 : : }
985 : :
986 [ + + - + ]: 250 : if (replication_state == NULL && free_state == NULL)
3973 andres@anarazel.de 987 [ # # ]:UBC 0 : ereport(ERROR,
988 : : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
989 : : errmsg("could not find free replication state slot for replication origin with ID %d",
990 : : node),
991 : : errhint("Increase \"max_active_replication_origins\" and try again.")));
992 : :
3973 andres@anarazel.de 993 [ + + ]:CBC 250 : if (replication_state == NULL)
994 : : {
995 : : /* initialize new slot */
996 : 204 : LWLockAcquire(&free_state->lock, LW_EXCLUSIVE);
997 : 204 : replication_state = free_state;
129 alvherre@kurilemu.de 998 [ - + ]:GNC 204 : Assert(!XLogRecPtrIsValid(replication_state->remote_lsn));
999 [ - + ]: 204 : Assert(!XLogRecPtrIsValid(replication_state->local_lsn));
3973 andres@anarazel.de 1000 :CBC 204 : replication_state->roident = node;
1001 : : }
1002 : :
46 msawada@postgresql.o 1003 [ - + ]:GNC 250 : Assert(replication_state->roident != InvalidReplOriginId);
1004 : :
1005 : : /*
1006 : : * If somebody "forcefully" sets this slot, WAL log it, so it's durable
1007 : : * and the standby gets the message. Primarily this will be called during
1008 : : * WAL replay (of commit records) where no WAL logging is necessary.
1009 : : */
3973 andres@anarazel.de 1010 [ + + ]:CBC 250 : if (wal_log)
1011 : : {
1012 : : xl_replorigin_set xlrec;
1013 : :
1014 : 209 : xlrec.remote_lsn = remote_commit;
1015 : 209 : xlrec.node_id = node;
1016 : 209 : xlrec.force = go_backward;
1017 : :
1018 : 209 : XLogBeginInsert();
397 peter@eisentraut.org 1019 : 209 : XLogRegisterData(&xlrec, sizeof(xlrec));
1020 : :
3973 andres@anarazel.de 1021 : 209 : XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_SET);
1022 : : }
1023 : :
1024 : : /*
1025 : : * Due to - harmless - race conditions during a checkpoint we could see
1026 : : * values here that are older than the ones we already have in memory. We
1027 : : * could also see older values for prepared transactions when the prepare
1028 : : * is sent at a later point of time along with commit prepared and there
1029 : : * are other transactions commits between prepare and commit prepared. See
1030 : : * ReorderBufferFinishPrepared. Don't overwrite those.
1031 : : */
1032 [ + + + + ]: 250 : if (go_backward || replication_state->remote_lsn < remote_commit)
1033 : 243 : replication_state->remote_lsn = remote_commit;
129 alvherre@kurilemu.de 1034 [ + + + + ]:GNC 250 : if (XLogRecPtrIsValid(local_commit) &&
3973 andres@anarazel.de 1035 [ + - ]:CBC 38 : (go_backward || replication_state->local_lsn < local_commit))
1036 : 40 : replication_state->local_lsn = local_commit;
1037 : 250 : LWLockRelease(&replication_state->lock);
1038 : :
1039 : : /*
1040 : : * Release *after* changing the LSNs, slot isn't acquired and thus could
1041 : : * otherwise be dropped anytime.
1042 : : */
1043 : 250 : LWLockRelease(ReplicationOriginLock);
1044 : : }
1045 : :
1046 : :
1047 : : XLogRecPtr
46 msawada@postgresql.o 1048 :GNC 8 : replorigin_get_progress(ReplOriginId node, bool flush)
1049 : : {
1050 : : int i;
3973 andres@anarazel.de 1051 :CBC 8 : XLogRecPtr local_lsn = InvalidXLogRecPtr;
1052 : 8 : XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1053 : :
1054 : : /* prevent slots from being concurrently dropped */
1055 : 8 : LWLockAcquire(ReplicationOriginLock, LW_SHARED);
1056 : :
359 msawada@postgresql.o 1057 [ + + ]: 38 : for (i = 0; i < max_active_replication_origins; i++)
1058 : : {
1059 : : ReplicationState *state;
1060 : :
3973 andres@anarazel.de 1061 : 35 : state = &replication_states[i];
1062 : :
1063 [ + + ]: 35 : if (state->roident == node)
1064 : : {
1065 : 5 : LWLockAcquire(&state->lock, LW_SHARED);
1066 : :
1067 : 5 : remote_lsn = state->remote_lsn;
1068 : 5 : local_lsn = state->local_lsn;
1069 : :
1070 : 5 : LWLockRelease(&state->lock);
1071 : :
1072 : 5 : break;
1073 : : }
1074 : : }
1075 : :
1076 : 8 : LWLockRelease(ReplicationOriginLock);
1077 : :
129 alvherre@kurilemu.de 1078 [ + + + - ]:GNC 8 : if (flush && XLogRecPtrIsValid(local_lsn))
3973 andres@anarazel.de 1079 :CBC 1 : XLogFlush(local_lsn);
1080 : :
1081 : 8 : return remote_lsn;
1082 : : }
1083 : :
1084 : : /* Helper function to reset the session replication origin */
1085 : : static void
60 akapila@postgresql.o 1086 :GNC 506 : replorigin_session_reset_internal(void)
1087 : : {
1088 : : ConditionVariable *cv;
1089 : :
1090 [ - + ]: 506 : Assert(session_replication_state != NULL);
1091 : :
3973 andres@anarazel.de 1092 :CBC 506 : LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1093 : :
1094 : : /* The origin must be held by at least one process at this point. */
60 akapila@postgresql.o 1095 [ - + ]:GNC 506 : Assert(session_replication_state->refcount > 0);
1096 : :
1097 : : /*
1098 : : * Reset the PID only if the current session is the first to set up this
1099 : : * origin. This avoids clearing the first process's PID when any other
1100 : : * session releases the origin.
1101 : : */
1102 [ + + ]: 506 : if (session_replication_state->acquired_by == MyProcPid)
3973 andres@anarazel.de 1103 :CBC 492 : session_replication_state->acquired_by = 0;
1104 : :
60 akapila@postgresql.o 1105 :GNC 506 : session_replication_state->refcount--;
1106 : :
1107 : 506 : cv = &session_replication_state->origin_cv;
1108 : 506 : session_replication_state = NULL;
1109 : :
3973 andres@anarazel.de 1110 :CBC 506 : LWLockRelease(ReplicationOriginLock);
1111 : :
60 akapila@postgresql.o 1112 :GNC 506 : ConditionVariableBroadcast(cv);
1113 : 506 : }
1114 : :
1115 : : /*
1116 : : * Tear down a (possibly) configured session replication origin during process
1117 : : * exit.
1118 : : */
1119 : : static void
1120 : 502 : ReplicationOriginExitCleanup(int code, Datum arg)
1121 : : {
1122 [ + + ]: 502 : if (session_replication_state == NULL)
1123 : 197 : return;
1124 : :
1125 : 305 : replorigin_session_reset_internal();
1126 : : }
1127 : :
1128 : : /*
1129 : : * Setup a replication origin in the shared memory struct if it doesn't
1130 : : * already exist and cache access to the specific ReplicationSlot so the
1131 : : * array doesn't have to be searched when calling
1132 : : * replorigin_session_advance().
1133 : : *
1134 : : * Normally only one such cached origin can exist per process so the cached
1135 : : * value can only be set again after the previous value is torn down with
1136 : : * replorigin_session_reset(). For this normal case pass acquired_by = 0
1137 : : * (meaning the slot is not allowed to be already acquired by another process).
1138 : : *
1139 : : * However, sometimes multiple processes can safely re-use the same origin slot
1140 : : * (for example, multiple parallel apply processes can safely use the same
1141 : : * origin, provided they maintain commit order by allowing only one process to
1142 : : * commit at a time). For this case the first process must pass acquired_by =
1143 : : * 0, and then the other processes sharing that same origin can pass
1144 : : * acquired_by = PID of the first process.
1145 : : */
1146 : : void
46 msawada@postgresql.o 1147 : 508 : replorigin_session_setup(ReplOriginId node, int acquired_by)
1148 : : {
1149 : : static bool registered_cleanup;
1150 : : int i;
3949 bruce@momjian.us 1151 :CBC 508 : int free_slot = -1;
1152 : :
3973 andres@anarazel.de 1153 [ + + ]: 508 : if (!registered_cleanup)
1154 : : {
1155 : 502 : on_shmem_exit(ReplicationOriginExitCleanup, 0);
1156 : 502 : registered_cleanup = true;
1157 : : }
1158 : :
359 msawada@postgresql.o 1159 [ - + ]: 508 : Assert(max_active_replication_origins > 0);
1160 : :
3973 andres@anarazel.de 1161 [ + + ]: 508 : if (session_replication_state != NULL)
1162 [ + - ]: 1 : ereport(ERROR,
1163 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1164 : : errmsg("cannot setup replication origin when one is already setup")));
1165 : :
1166 : : /* Lock exclusively, as we may have to create a new table entry. */
1167 : 507 : LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1168 : :
1169 : : /*
1170 : : * Search for either an existing slot for the origin, or a free one we can
1171 : : * use.
1172 : : */
359 msawada@postgresql.o 1173 [ + + ]: 2051 : for (i = 0; i < max_active_replication_origins; i++)
1174 : : {
3973 andres@anarazel.de 1175 : 1926 : ReplicationState *curstate = &replication_states[i];
1176 : :
1177 : : /* remember where to insert if necessary */
46 msawada@postgresql.o 1178 [ + + + + ]:GNC 1926 : if (curstate->roident == InvalidReplOriginId &&
1179 : : free_slot == -1)
1180 : : {
3973 andres@anarazel.de 1181 :CBC 127 : free_slot = i;
1182 : 127 : continue;
1183 : : }
1184 : :
1185 : : /* not our slot */
1186 [ + + ]: 1799 : if (curstate->roident != node)
1187 : 1417 : continue;
1188 : :
1161 akapila@postgresql.o 1189 [ + + - + ]: 382 : else if (curstate->acquired_by != 0 && acquired_by == 0)
1190 : : {
3973 andres@anarazel.de 1191 [ # # ]:UBC 0 : ereport(ERROR,
1192 : : (errcode(ERRCODE_OBJECT_IN_USE),
1193 : : errmsg("replication origin with ID %d is already active for PID %d",
1194 : : curstate->roident, curstate->acquired_by)));
1195 : : }
1196 : :
177 akapila@postgresql.o 1197 [ - + ]:GNC 382 : else if (curstate->acquired_by != acquired_by)
1198 : : {
177 akapila@postgresql.o 1199 [ # # ]:UNC 0 : ereport(ERROR,
1200 : : (errcode(ERRCODE_OBJECT_IN_USE),
1201 : : errmsg("could not find replication state slot for replication origin with OID %u which was acquired by %d",
1202 : : node, acquired_by)));
1203 : : }
1204 : :
1205 : : /*
1206 : : * The origin is in use, but PID is not recorded. This can happen if
1207 : : * the process that originally acquired the origin exited without
1208 : : * releasing it. To ensure correctness, other processes cannot acquire
1209 : : * the origin until all processes currently using it have released it.
1210 : : */
60 akapila@postgresql.o 1211 [ + + - + ]:GNC 382 : else if (curstate->acquired_by == 0 && curstate->refcount > 0)
60 akapila@postgresql.o 1212 [ # # ]:UNC 0 : ereport(ERROR,
1213 : : (errcode(ERRCODE_OBJECT_IN_USE),
1214 : : errmsg("replication origin with ID %d is already active in another process",
1215 : : curstate->roident)));
1216 : :
1217 : : /* ok, found slot */
3973 andres@anarazel.de 1218 :CBC 382 : session_replication_state = curstate;
844 akapila@postgresql.o 1219 : 382 : break;
1220 : : }
1221 : :
1222 : :
3973 andres@anarazel.de 1223 [ + + - + ]: 507 : if (session_replication_state == NULL && free_slot == -1)
3973 andres@anarazel.de 1224 [ # # ]:UBC 0 : ereport(ERROR,
1225 : : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
1226 : : errmsg("could not find free replication state slot for replication origin with ID %d",
1227 : : node),
1228 : : errhint("Increase \"max_active_replication_origins\" and try again.")));
3973 andres@anarazel.de 1229 [ + + ]:CBC 507 : else if (session_replication_state == NULL)
1230 : : {
177 akapila@postgresql.o 1231 [ + + ]:GNC 125 : if (acquired_by)
1232 [ + - ]: 1 : ereport(ERROR,
1233 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1234 : : errmsg("cannot use PID %d for inactive replication origin with ID %d",
1235 : : acquired_by, node)));
1236 : :
1237 : : /* initialize new slot */
3973 andres@anarazel.de 1238 :CBC 124 : session_replication_state = &replication_states[free_slot];
129 alvherre@kurilemu.de 1239 [ - + ]:GNC 124 : Assert(!XLogRecPtrIsValid(session_replication_state->remote_lsn));
1240 [ - + ]: 124 : Assert(!XLogRecPtrIsValid(session_replication_state->local_lsn));
3973 andres@anarazel.de 1241 :CBC 124 : session_replication_state->roident = node;
1242 : : }
1243 : :
1244 : :
46 msawada@postgresql.o 1245 [ - + ]:GNC 506 : Assert(session_replication_state->roident != InvalidReplOriginId);
1246 : :
1161 akapila@postgresql.o 1247 [ + + ]:CBC 506 : if (acquired_by == 0)
1248 : : {
1249 : 492 : session_replication_state->acquired_by = MyProcPid;
60 akapila@postgresql.o 1250 [ - + ]:GNC 492 : Assert(session_replication_state->refcount == 0);
1251 : : }
1252 : : else
1253 : : {
1254 : : /*
1255 : : * Sanity check: the origin must already be acquired by the process
1256 : : * passed as input, and at least one process must be using it.
1257 : : */
177 1258 [ - + ]: 14 : Assert(session_replication_state->acquired_by == acquired_by);
60 1259 [ - + ]: 14 : Assert(session_replication_state->refcount > 0);
1260 : : }
1261 : :
1262 : 506 : session_replication_state->refcount++;
1263 : :
3973 andres@anarazel.de 1264 :CBC 506 : LWLockRelease(ReplicationOriginLock);
1265 : :
1266 : : /* probably this one is pointless */
3141 alvherre@alvh.no-ip. 1267 : 506 : ConditionVariableBroadcast(&session_replication_state->origin_cv);
3973 andres@anarazel.de 1268 : 506 : }
1269 : :
1270 : : /*
1271 : : * Reset replay state previously setup in this session.
1272 : : *
1273 : : * This function may only be called if an origin was setup with
1274 : : * replorigin_session_setup().
1275 : : */
1276 : : void
1277 : 203 : replorigin_session_reset(void)
1278 : : {
359 msawada@postgresql.o 1279 [ - + ]: 203 : Assert(max_active_replication_origins != 0);
1280 : :
3973 andres@anarazel.de 1281 [ + + ]: 203 : if (session_replication_state == NULL)
1282 [ + - ]: 1 : ereport(ERROR,
1283 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1284 : : errmsg("no replication origin is configured")));
1285 : :
1286 : : /*
1287 : : * Restrict explicit resetting of the replication origin if it was first
1288 : : * acquired by this process and others are still using it. While the
1289 : : * system handles this safely (as happens if the first session exits
1290 : : * without calling reset), it is best to avoid doing so.
1291 : : */
60 akapila@postgresql.o 1292 [ + + ]:GNC 202 : if (session_replication_state->acquired_by == MyProcPid &&
1293 [ + + ]: 200 : session_replication_state->refcount > 1)
1294 [ + - ]: 1 : ereport(ERROR,
1295 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1296 : : errmsg("cannot reset replication origin with ID %d because it is still in use by other processes",
1297 : : session_replication_state->roident),
1298 : : errdetail("This session is the first process for this replication origin, and other processes are currently sharing it."),
1299 : : errhint("Reset the replication origin in all other processes before retrying.")));
1300 : :
1301 : 201 : replorigin_session_reset_internal();
3973 andres@anarazel.de 1302 :CBC 201 : }
1303 : :
1304 : : /*
1305 : : * Do the same work replorigin_advance() does, just on the session's
1306 : : * configured origin.
1307 : : *
1308 : : * This is noticeably cheaper than using replorigin_advance().
1309 : : */
1310 : : void
1311 : 1125 : replorigin_session_advance(XLogRecPtr remote_commit, XLogRecPtr local_commit)
1312 : : {
1313 [ - + ]: 1125 : Assert(session_replication_state != NULL);
46 msawada@postgresql.o 1314 [ - + ]:GNC 1125 : Assert(session_replication_state->roident != InvalidReplOriginId);
1315 : :
3973 andres@anarazel.de 1316 :CBC 1125 : LWLockAcquire(&session_replication_state->lock, LW_EXCLUSIVE);
1317 [ + - ]: 1125 : if (session_replication_state->local_lsn < local_commit)
1318 : 1125 : session_replication_state->local_lsn = local_commit;
1319 [ + + ]: 1125 : if (session_replication_state->remote_lsn < remote_commit)
1320 : 523 : session_replication_state->remote_lsn = remote_commit;
1321 : 1125 : LWLockRelease(&session_replication_state->lock);
1322 : 1125 : }
1323 : :
1324 : : /*
1325 : : * Ask the machinery about the point up to which we successfully replayed
1326 : : * changes from an already setup replication origin.
1327 : : */
1328 : : XLogRecPtr
1329 : 281 : replorigin_session_get_progress(bool flush)
1330 : : {
1331 : : XLogRecPtr remote_lsn;
1332 : : XLogRecPtr local_lsn;
1333 : :
1334 [ - + ]: 281 : Assert(session_replication_state != NULL);
1335 : :
1336 : 281 : LWLockAcquire(&session_replication_state->lock, LW_SHARED);
1337 : 281 : remote_lsn = session_replication_state->remote_lsn;
1338 : 281 : local_lsn = session_replication_state->local_lsn;
1339 : 281 : LWLockRelease(&session_replication_state->lock);
1340 : :
129 alvherre@kurilemu.de 1341 [ + + + - ]:GNC 281 : if (flush && XLogRecPtrIsValid(local_lsn))
3973 andres@anarazel.de 1342 :CBC 1 : XLogFlush(local_lsn);
1343 : :
1344 : 281 : return remote_lsn;
1345 : : }
1346 : :
1347 : : /*
1348 : : * Clear the per-transaction replication origin state.
1349 : : *
1350 : : * replorigin_session_origin is also cleared if clear_origin is set.
1351 : : */
1352 : : void
46 msawada@postgresql.o 1353 :GNC 795 : replorigin_xact_clear(bool clear_origin)
1354 : : {
1355 : 795 : replorigin_xact_state.origin_lsn = InvalidXLogRecPtr;
1356 : 795 : replorigin_xact_state.origin_timestamp = 0;
1357 [ + - ]: 795 : if (clear_origin)
1358 : 795 : replorigin_xact_state.origin = InvalidReplOriginId;
1359 : 795 : }
1360 : :
1361 : :
1362 : : /* ---------------------------------------------------------------------------
1363 : : * SQL functions for working with replication origin.
1364 : : *
1365 : : * These mostly should be fairly short wrappers around more generic functions.
1366 : : * ---------------------------------------------------------------------------
1367 : : */
1368 : :
1369 : : /*
1370 : : * Create replication origin for the passed in name, and return the assigned
1371 : : * oid.
1372 : : */
1373 : : Datum
3973 andres@anarazel.de 1374 :CBC 14 : pg_replication_origin_create(PG_FUNCTION_ARGS)
1375 : : {
1376 : : char *name;
1377 : : ReplOriginId roident;
1378 : :
1379 : 14 : replorigin_check_prerequisites(false, false);
1380 : :
1381 : 14 : name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1382 : :
1383 : : /*
1384 : : * Replication origins "any and "none" are reserved for system options.
1385 : : * The origins "pg_xxx" are reserved for internal use.
1386 : : */
1333 akapila@postgresql.o 1387 [ + + + + ]: 14 : if (IsReservedName(name) || IsReservedOriginName(name))
2451 tgl@sss.pgh.pa.us 1388 [ + - ]: 3 : ereport(ERROR,
1389 : : (errcode(ERRCODE_RESERVED_NAME),
1390 : : errmsg("replication origin name \"%s\" is reserved",
1391 : : name),
1392 : : errdetail("Origin names \"%s\", \"%s\", and names starting with \"pg_\" are reserved.",
1393 : : LOGICALREP_ORIGIN_ANY, LOGICALREP_ORIGIN_NONE)));
1394 : :
1395 : : /*
1396 : : * If built with appropriate switch, whine when regression-testing
1397 : : * conventions for replication origin names are violated.
1398 : : */
1399 : : #ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
1400 : : if (strncmp(name, "regress_", 8) != 0)
1401 : : elog(WARNING, "replication origins created by regression test cases should have names starting with \"regress_\"");
1402 : : #endif
1403 : :
3973 andres@anarazel.de 1404 : 11 : roident = replorigin_create(name);
1405 : :
1406 : 7 : pfree(name);
1407 : :
1408 : 7 : PG_RETURN_OID(roident);
1409 : : }
1410 : :
1411 : : /*
1412 : : * Drop replication origin.
1413 : : */
1414 : : Datum
1415 : 9 : pg_replication_origin_drop(PG_FUNCTION_ARGS)
1416 : : {
1417 : : char *name;
1418 : :
1419 : 9 : replorigin_check_prerequisites(false, false);
1420 : :
1421 : 9 : name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1422 : :
1859 akapila@postgresql.o 1423 : 9 : replorigin_drop_by_name(name, false, true);
1424 : :
3973 andres@anarazel.de 1425 : 8 : pfree(name);
1426 : :
1427 : 8 : PG_RETURN_VOID();
1428 : : }
1429 : :
1430 : : /*
1431 : : * Return oid of a replication origin.
1432 : : */
1433 : : Datum
3973 andres@anarazel.de 1434 :UBC 0 : pg_replication_origin_oid(PG_FUNCTION_ARGS)
1435 : : {
1436 : : char *name;
1437 : : ReplOriginId roident;
1438 : :
1439 : 0 : replorigin_check_prerequisites(false, false);
1440 : :
1441 : 0 : name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1442 : 0 : roident = replorigin_by_name(name, true);
1443 : :
1444 : 0 : pfree(name);
1445 : :
1446 [ # # ]: 0 : if (OidIsValid(roident))
1447 : 0 : PG_RETURN_OID(roident);
1448 : 0 : PG_RETURN_NULL();
1449 : : }
1450 : :
1451 : : /*
1452 : : * Setup a replication origin for this session.
1453 : : */
1454 : : Datum
3973 andres@anarazel.de 1455 :CBC 11 : pg_replication_origin_session_setup(PG_FUNCTION_ARGS)
1456 : : {
1457 : : char *name;
1458 : : ReplOriginId origin;
1459 : : int pid;
1460 : :
1461 : 11 : replorigin_check_prerequisites(true, false);
1462 : :
1463 : 11 : name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1464 : 11 : origin = replorigin_by_name(name, false);
177 akapila@postgresql.o 1465 :GNC 10 : pid = PG_GETARG_INT32(1);
1466 : 10 : replorigin_session_setup(origin, pid);
1467 : :
46 msawada@postgresql.o 1468 : 8 : replorigin_xact_state.origin = origin;
1469 : :
3973 andres@anarazel.de 1470 :CBC 8 : pfree(name);
1471 : :
1472 : 8 : PG_RETURN_VOID();
1473 : : }
1474 : :
1475 : : /*
1476 : : * Reset previously setup origin in this session
1477 : : */
1478 : : Datum
1479 : 10 : pg_replication_origin_session_reset(PG_FUNCTION_ARGS)
1480 : : {
1481 : 10 : replorigin_check_prerequisites(true, false);
1482 : :
1483 : 10 : replorigin_session_reset();
1484 : :
46 msawada@postgresql.o 1485 :GNC 8 : replorigin_xact_clear(true);
1486 : :
3973 andres@anarazel.de 1487 :CBC 8 : PG_RETURN_VOID();
1488 : : }
1489 : :
1490 : : /*
1491 : : * Has a replication origin been setup for this session.
1492 : : */
1493 : : Datum
3973 andres@anarazel.de 1494 :GBC 4 : pg_replication_origin_session_is_setup(PG_FUNCTION_ARGS)
1495 : : {
1496 : 4 : replorigin_check_prerequisites(false, false);
1497 : :
46 msawada@postgresql.o 1498 :GNC 4 : PG_RETURN_BOOL(replorigin_xact_state.origin != InvalidReplOriginId);
1499 : : }
1500 : :
1501 : :
1502 : : /*
1503 : : * Return the replication progress for origin setup in the current session.
1504 : : *
1505 : : * If 'flush' is set to true it is ensured that the returned value corresponds
1506 : : * to a local transaction that has been flushed. This is useful if asynchronous
1507 : : * commits are used when replaying replicated transactions.
1508 : : */
1509 : : Datum
3973 andres@anarazel.de 1510 :CBC 2 : pg_replication_origin_session_progress(PG_FUNCTION_ARGS)
1511 : : {
1512 : 2 : XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1513 : 2 : bool flush = PG_GETARG_BOOL(0);
1514 : :
1515 : 2 : replorigin_check_prerequisites(true, false);
1516 : :
1517 [ - + ]: 2 : if (session_replication_state == NULL)
3973 andres@anarazel.de 1518 [ # # ]:UBC 0 : ereport(ERROR,
1519 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1520 : : errmsg("no replication origin is configured")));
1521 : :
3973 andres@anarazel.de 1522 :CBC 2 : remote_lsn = replorigin_session_get_progress(flush);
1523 : :
129 alvherre@kurilemu.de 1524 [ - + ]:GNC 2 : if (!XLogRecPtrIsValid(remote_lsn))
3973 andres@anarazel.de 1525 :UBC 0 : PG_RETURN_NULL();
1526 : :
3973 andres@anarazel.de 1527 :CBC 2 : PG_RETURN_LSN(remote_lsn);
1528 : : }
1529 : :
1530 : : Datum
1531 : 1 : pg_replication_origin_xact_setup(PG_FUNCTION_ARGS)
1532 : : {
1533 : 1 : XLogRecPtr location = PG_GETARG_LSN(0);
1534 : :
1535 : 1 : replorigin_check_prerequisites(true, false);
1536 : :
1537 [ - + ]: 1 : if (session_replication_state == NULL)
3973 andres@anarazel.de 1538 [ # # ]:UBC 0 : ereport(ERROR,
1539 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1540 : : errmsg("no replication origin is configured")));
1541 : :
46 msawada@postgresql.o 1542 :GNC 1 : replorigin_xact_state.origin_lsn = location;
1543 : 1 : replorigin_xact_state.origin_timestamp = PG_GETARG_TIMESTAMPTZ(1);
1544 : :
3973 andres@anarazel.de 1545 :CBC 1 : PG_RETURN_VOID();
1546 : : }
1547 : :
1548 : : Datum
3973 andres@anarazel.de 1549 :UBC 0 : pg_replication_origin_xact_reset(PG_FUNCTION_ARGS)
1550 : : {
1551 : 0 : replorigin_check_prerequisites(true, false);
1552 : :
1553 : : /* Do not clear the session origin */
46 msawada@postgresql.o 1554 :UNC 0 : replorigin_xact_clear(false);
1555 : :
3973 andres@anarazel.de 1556 :UBC 0 : PG_RETURN_VOID();
1557 : : }
1558 : :
1559 : :
1560 : : Datum
3973 andres@anarazel.de 1561 :CBC 3 : pg_replication_origin_advance(PG_FUNCTION_ARGS)
1562 : : {
3290 noah@leadboat.com 1563 : 3 : text *name = PG_GETARG_TEXT_PP(0);
3949 bruce@momjian.us 1564 : 3 : XLogRecPtr remote_commit = PG_GETARG_LSN(1);
1565 : : ReplOriginId node;
1566 : :
3973 andres@anarazel.de 1567 : 3 : replorigin_check_prerequisites(true, false);
1568 : :
1569 : : /* lock to prevent the replication origin from vanishing */
1570 : 3 : LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1571 : :
1572 : 3 : node = replorigin_by_name(text_to_cstring(name), false);
1573 : :
1574 : : /*
1575 : : * Can't sensibly pass a local commit to be flushed at checkpoint - this
1576 : : * xact hasn't committed yet. This is why this function should be used to
1577 : : * set up the initial replication state, but not for replay.
1578 : : */
1579 : 2 : replorigin_advance(node, remote_commit, InvalidXLogRecPtr,
1580 : : true /* go backward */ , true /* WAL log */ );
1581 : :
1582 : 2 : UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1583 : :
1584 : 2 : PG_RETURN_VOID();
1585 : : }
1586 : :
1587 : :
1588 : : /*
1589 : : * Return the replication progress for an individual replication origin.
1590 : : *
1591 : : * If 'flush' is set to true it is ensured that the returned value corresponds
1592 : : * to a local transaction that has been flushed. This is useful if asynchronous
1593 : : * commits are used when replaying replicated transactions.
1594 : : */
1595 : : Datum
1596 : 3 : pg_replication_origin_progress(PG_FUNCTION_ARGS)
1597 : : {
1598 : : char *name;
1599 : : bool flush;
1600 : : ReplOriginId roident;
1601 : 3 : XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1602 : :
1603 : 3 : replorigin_check_prerequisites(true, true);
1604 : :
1605 : 3 : name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1606 : 3 : flush = PG_GETARG_BOOL(1);
1607 : :
1608 : 3 : roident = replorigin_by_name(name, false);
1609 [ - + ]: 2 : Assert(OidIsValid(roident));
1610 : :
1611 : 2 : remote_lsn = replorigin_get_progress(roident, flush);
1612 : :
129 alvherre@kurilemu.de 1613 [ - + ]:GNC 2 : if (!XLogRecPtrIsValid(remote_lsn))
3973 andres@anarazel.de 1614 :UBC 0 : PG_RETURN_NULL();
1615 : :
3973 andres@anarazel.de 1616 :CBC 2 : PG_RETURN_LSN(remote_lsn);
1617 : : }
1618 : :
1619 : :
1620 : : Datum
1621 : 11 : pg_show_replication_origin_status(PG_FUNCTION_ARGS)
1622 : : {
1623 : 11 : ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1624 : : int i;
1625 : : #define REPLICATION_ORIGIN_PROGRESS_COLS 4
1626 : :
1627 : : /* we want to return 0 rows if slot is set to zero */
1628 : 11 : replorigin_check_prerequisites(false, true);
1629 : :
1244 michael@paquier.xyz 1630 : 11 : InitMaterializedSRF(fcinfo, 0);
1631 : :
1632 : : /* prevent slots from being concurrently dropped */
3973 andres@anarazel.de 1633 : 11 : LWLockAcquire(ReplicationOriginLock, LW_SHARED);
1634 : :
1635 : : /*
1636 : : * Iterate through all possible replication_states, display if they are
1637 : : * filled. Note that we do not take any locks, so slightly corrupted/out
1638 : : * of date values are a possibility.
1639 : : */
359 msawada@postgresql.o 1640 [ + + ]: 121 : for (i = 0; i < max_active_replication_origins; i++)
1641 : : {
1642 : : ReplicationState *state;
1643 : : Datum values[REPLICATION_ORIGIN_PROGRESS_COLS];
1644 : : bool nulls[REPLICATION_ORIGIN_PROGRESS_COLS];
1645 : : char *roname;
1646 : :
3973 andres@anarazel.de 1647 : 110 : state = &replication_states[i];
1648 : :
1649 : : /* unused slot, nothing to display */
46 msawada@postgresql.o 1650 [ + + ]:GNC 110 : if (state->roident == InvalidReplOriginId)
3973 andres@anarazel.de 1651 :CBC 97 : continue;
1652 : :
1653 : 13 : memset(values, 0, sizeof(values));
1654 : 13 : memset(nulls, 1, sizeof(nulls));
1655 : :
1656 : 13 : values[0] = ObjectIdGetDatum(state->roident);
1657 : 13 : nulls[0] = false;
1658 : :
1659 : : /*
1660 : : * We're not preventing the origin to be dropped concurrently, so
1661 : : * silently accept that it might be gone.
1662 : : */
1663 [ + - ]: 13 : if (replorigin_by_oid(state->roident, true,
1664 : : &roname))
1665 : : {
1666 : 13 : values[1] = CStringGetTextDatum(roname);
1667 : 13 : nulls[1] = false;
1668 : : }
1669 : :
1670 : 13 : LWLockAcquire(&state->lock, LW_SHARED);
1671 : :
3949 bruce@momjian.us 1672 : 13 : values[2] = LSNGetDatum(state->remote_lsn);
3973 andres@anarazel.de 1673 : 13 : nulls[2] = false;
1674 : :
1675 : 13 : values[3] = LSNGetDatum(state->local_lsn);
1676 : 13 : nulls[3] = false;
1677 : :
1678 : 13 : LWLockRelease(&state->lock);
1679 : :
1469 michael@paquier.xyz 1680 : 13 : tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
1681 : : values, nulls);
1682 : : }
1683 : :
3973 andres@anarazel.de 1684 : 11 : LWLockRelease(ReplicationOriginLock);
1685 : :
1686 : : #undef REPLICATION_ORIGIN_PROGRESS_COLS
1687 : :
1688 : 11 : return (Datum) 0;
1689 : : }
|