Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * origin.c
4 : : * Logical replication progress tracking support.
5 : : *
6 : : * Copyright (c) 2013-2026, PostgreSQL Global Development Group
7 : : *
8 : : * IDENTIFICATION
9 : : * src/backend/replication/logical/origin.c
10 : : *
11 : : * NOTES
12 : : *
13 : : * This file provides the following:
14 : : * * An infrastructure to name nodes in a replication setup
15 : : * * A facility to efficiently store and persist replication progress in an
16 : : * efficient and durable manner.
17 : : *
18 : : * Replication origin consists of a descriptive, user defined, external
19 : : * name and a short, thus space efficient, internal 2 byte one. This split
20 : : * exists because replication origin have to be stored in WAL and shared
21 : : * memory and long descriptors would be inefficient. For now only use 2 bytes
22 : : * for the internal id of a replication origin as it seems unlikely that there
23 : : * soon will be more than 65k nodes in one replication setup; and using only
24 : : * two bytes allow us to be more space efficient.
25 : : *
26 : : * Replication progress is tracked in a shared memory table
27 : : * (ReplicationState) that's dumped to disk every checkpoint. Entries
28 : : * ('slots') in this table are identified by the internal id. That's the case
29 : : * because it allows to increase replication progress during crash
30 : : * recovery. To allow doing so we store the original LSN (from the originating
31 : : * system) of a transaction in the commit record. That allows to recover the
32 : : * precise replayed state after crash recovery; without requiring synchronous
33 : : * commits. Allowing logical replication to use asynchronous commit is
34 : : * generally good for performance, but especially important as it allows a
35 : : * single threaded replay process to keep up with a source that has multiple
36 : : * backends generating changes concurrently. For efficiency and simplicity
37 : : * reasons a backend can setup one replication origin that's from then used as
38 : : * the source of changes produced by the backend, until reset again.
39 : : *
40 : : * This infrastructure is intended to be used in cooperation with logical
41 : : * decoding. When replaying from a remote system the configured origin is
42 : : * provided to output plugins, allowing prevention of replication loops and
43 : : * other filtering.
44 : : *
45 : : * There are several levels of locking at work:
46 : : *
47 : : * * To create and drop replication origins an exclusive lock on
48 : : * pg_replication_slot is required for the duration. That allows us to
49 : : * safely and conflict free assign new origins using a dirty snapshot.
50 : : *
51 : : * * When creating an in-memory replication progress slot the ReplicationOrigin
52 : : * LWLock has to be held exclusively; when iterating over the replication
53 : : * progress a shared lock has to be held, the same when advancing the
54 : : * replication progress of an individual backend that has not setup as the
55 : : * session's replication origin.
56 : : *
57 : : * * When manipulating or looking at the remote_lsn and local_lsn fields of a
58 : : * replication progress slot that slot's lwlock has to be held. That's
59 : : * primarily because we do not assume 8 byte writes (the LSN) is atomic on
60 : : * all our platforms, but it also simplifies memory ordering concerns
61 : : * between the remote and local lsn. We use a lwlock instead of a spinlock
62 : : * so it's less harmful to hold the lock over a WAL write
63 : : * (cf. AdvanceReplicationProgress).
64 : : *
65 : : * ---------------------------------------------------------------------------
66 : : */
67 : :
68 : : #include "postgres.h"
69 : :
70 : : #include <unistd.h>
71 : : #include <sys/stat.h>
72 : :
73 : : #include "access/genam.h"
74 : : #include "access/htup_details.h"
75 : : #include "access/table.h"
76 : : #include "access/xact.h"
77 : : #include "access/xloginsert.h"
78 : : #include "catalog/catalog.h"
79 : : #include "catalog/indexing.h"
80 : : #include "catalog/pg_subscription.h"
81 : : #include "funcapi.h"
82 : : #include "miscadmin.h"
83 : : #include "nodes/execnodes.h"
84 : : #include "pgstat.h"
85 : : #include "replication/origin.h"
86 : : #include "replication/slot.h"
87 : : #include "storage/condition_variable.h"
88 : : #include "storage/fd.h"
89 : : #include "storage/ipc.h"
90 : : #include "storage/lmgr.h"
91 : : #include "storage/subsystems.h"
92 : : #include "utils/builtins.h"
93 : : #include "utils/fmgroids.h"
94 : : #include "utils/guc.h"
95 : : #include "utils/pg_lsn.h"
96 : : #include "utils/rel.h"
97 : : #include "utils/snapmgr.h"
98 : : #include "utils/syscache.h"
99 : : #include "utils/wait_event.h"
100 : :
101 : : /* paths for replication origin checkpoint files */
102 : : #define PG_REPLORIGIN_CHECKPOINT_FILENAME PG_LOGICAL_DIR "/replorigin_checkpoint"
103 : : #define PG_REPLORIGIN_CHECKPOINT_TMPFILE PG_REPLORIGIN_CHECKPOINT_FILENAME ".tmp"
104 : :
105 : : /* GUC variables */
106 : : int max_active_replication_origins = 10;
107 : :
108 : : /*
109 : : * Replay progress of a single remote node.
110 : : */
111 : : typedef struct ReplicationState
112 : : {
113 : : /*
114 : : * Local identifier for the remote node.
115 : : */
116 : : ReplOriginId roident;
117 : :
118 : : /*
119 : : * Location of the latest commit from the remote side.
120 : : */
121 : : XLogRecPtr remote_lsn;
122 : :
123 : : /*
124 : : * Remember the local lsn of the commit record so we can XLogFlush() to it
125 : : * during a checkpoint so we know the commit record actually is safe on
126 : : * disk.
127 : : */
128 : : XLogRecPtr local_lsn;
129 : :
130 : : /*
131 : : * PID of backend that's acquired slot, or 0 if none.
132 : : */
133 : : int acquired_by;
134 : :
135 : : /* Count of processes that are currently using this origin. */
136 : : int refcount;
137 : :
138 : : /*
139 : : * Condition variable that's signaled when acquired_by changes.
140 : : */
141 : : ConditionVariable origin_cv;
142 : :
143 : : /*
144 : : * Lock protecting remote_lsn and local_lsn.
145 : : */
146 : : LWLock lock;
147 : : } ReplicationState;
148 : :
149 : : /*
150 : : * On disk version of ReplicationState.
151 : : */
152 : : typedef struct ReplicationStateOnDisk
153 : : {
154 : : ReplOriginId roident;
155 : : XLogRecPtr remote_lsn;
156 : : } ReplicationStateOnDisk;
157 : :
158 : :
159 : : typedef struct ReplicationStateCtl
160 : : {
161 : : /* Tranche to use for per-origin LWLocks */
162 : : int tranche_id;
163 : : /* Array of length max_active_replication_origins */
164 : : ReplicationState states[FLEXIBLE_ARRAY_MEMBER];
165 : : } ReplicationStateCtl;
166 : :
167 : : /* Global variable for per-transaction replication origin state */
168 : : ReplOriginXactState replorigin_xact_state = {
169 : : .origin = InvalidReplOriginId, /* assumed identity */
170 : : .origin_lsn = InvalidXLogRecPtr,
171 : : .origin_timestamp = 0
172 : : };
173 : :
174 : : /*
175 : : * Base address into a shared memory array of replication states of size
176 : : * max_active_replication_origins.
177 : : */
178 : : static ReplicationState *replication_states;
179 : :
180 : : static void ReplicationOriginShmemRequest(void *arg);
181 : : static void ReplicationOriginShmemInit(void *arg);
182 : : static void ReplicationOriginShmemAttach(void *arg);
183 : :
184 : : const ShmemCallbacks ReplicationOriginShmemCallbacks = {
185 : : .request_fn = ReplicationOriginShmemRequest,
186 : : .init_fn = ReplicationOriginShmemInit,
187 : : .attach_fn = ReplicationOriginShmemAttach,
188 : : };
189 : :
190 : : /*
191 : : * Actual shared memory block (replication_states[] is now part of this).
192 : : */
193 : : static ReplicationStateCtl *replication_states_ctl;
194 : :
195 : : /*
196 : : * We keep a pointer to this backend's ReplicationState to avoid having to
197 : : * search the replication_states array in replorigin_session_advance for each
198 : : * remote commit. (Ownership of a backend's own entry can only be changed by
199 : : * that backend.)
200 : : */
201 : : static ReplicationState *session_replication_state = NULL;
202 : :
203 : : /* Magic for on disk files. */
204 : : #define REPLICATION_STATE_MAGIC ((uint32) 0x1257DADE)
205 : :
206 : : static void
410 msawada@postgresql.o 207 :CBC 69 : replorigin_check_prerequisites(bool check_origins, bool recoveryOK)
208 : : {
209 [ + + - + ]: 69 : if (check_origins && max_active_replication_origins == 0)
4024 andres@anarazel.de 210 [ # # ]:UBC 0 : ereport(ERROR,
211 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
212 : : errmsg("cannot query or manipulate replication origin when \"max_active_replication_origins\" is 0")));
213 : :
4024 andres@anarazel.de 214 [ + + - + ]:CBC 69 : if (!recoveryOK && RecoveryInProgress())
4024 andres@anarazel.de 215 [ # # ]:UBC 0 : ereport(ERROR,
216 : : (errcode(ERRCODE_READ_ONLY_SQL_TRANSACTION),
217 : : errmsg("cannot manipulate replication origins during recovery")));
4024 andres@anarazel.de 218 :CBC 69 : }
219 : :
220 : :
221 : : /*
222 : : * IsReservedOriginName
223 : : * True iff name is either "none" or "any".
224 : : */
225 : : static bool
1384 akapila@postgresql.o 226 : 14 : IsReservedOriginName(const char *name)
227 : : {
228 [ + + + + ]: 27 : return ((pg_strcasecmp(name, LOGICALREP_ORIGIN_NONE) == 0) ||
229 : 13 : (pg_strcasecmp(name, LOGICALREP_ORIGIN_ANY) == 0));
230 : : }
231 : :
232 : : /* ---------------------------------------------------------------------------
233 : : * Functions for working with replication origins themselves.
234 : : * ---------------------------------------------------------------------------
235 : : */
236 : :
237 : : /*
238 : : * Check for a persistent replication origin identified by name.
239 : : *
240 : : * Returns InvalidOid if the node isn't known yet and missing_ok is true.
241 : : */
242 : : ReplOriginId
1790 peter@eisentraut.org 243 : 1083 : replorigin_by_name(const char *roname, bool missing_ok)
244 : : {
245 : : Form_pg_replication_origin ident;
4000 bruce@momjian.us 246 : 1083 : Oid roident = InvalidOid;
247 : : HeapTuple tuple;
248 : : Datum roname_d;
249 : :
4024 andres@anarazel.de 250 : 1083 : roname_d = CStringGetTextDatum(roname);
251 : :
252 : 1083 : tuple = SearchSysCache1(REPLORIGNAME, roname_d);
253 [ + + ]: 1083 : if (HeapTupleIsValid(tuple))
254 : : {
255 : 679 : ident = (Form_pg_replication_origin) GETSTRUCT(tuple);
256 : 679 : roident = ident->roident;
257 : 679 : ReleaseSysCache(tuple);
258 : : }
259 [ + + ]: 404 : else if (!missing_ok)
3134 rhaas@postgresql.org 260 [ + - ]: 4 : ereport(ERROR,
261 : : (errcode(ERRCODE_UNDEFINED_OBJECT),
262 : : errmsg("replication origin \"%s\" does not exist",
263 : : roname)));
264 : :
4024 andres@anarazel.de 265 : 1079 : return roident;
266 : : }
267 : :
268 : : /*
269 : : * Create a replication origin.
270 : : *
271 : : * Needs to be called in a transaction.
272 : : */
273 : : ReplOriginId
1790 peter@eisentraut.org 274 : 409 : replorigin_create(const char *roname)
275 : : {
276 : : Oid roident;
4000 bruce@momjian.us 277 : 409 : HeapTuple tuple = NULL;
278 : : Relation rel;
279 : : Datum roname_d;
280 : : SnapshotData SnapshotDirty;
281 : : SysScanDesc scan;
282 : : ScanKeyData key;
283 : :
284 : : /*
285 : : * To avoid needing a TOAST table for pg_replication_origin, we limit
286 : : * replication origin names to 512 bytes. This should be more than enough
287 : : * for all practical use.
288 : : */
363 nathan@postgresql.or 289 [ + + ]: 409 : if (strlen(roname) > MAX_RONAME_LEN)
290 [ + - ]: 4 : ereport(ERROR,
291 : : (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
292 : : errmsg("replication origin name is too long"),
293 : : errdetail("Replication origin names must be no longer than %d bytes.",
294 : : MAX_RONAME_LEN)));
295 : :
4024 andres@anarazel.de 296 : 405 : roname_d = CStringGetTextDatum(roname);
297 : :
298 [ - + ]: 405 : Assert(IsTransactionState());
299 : :
300 : : /*
301 : : * We need the numeric replication origin to be 16bit wide, so we cannot
302 : : * rely on the normal oid allocation. Instead we simply scan
303 : : * pg_replication_origin for the first unused id. That's not particularly
304 : : * efficient, but this should be a fairly infrequent operation - we can
305 : : * easily spend a bit more code on this when it turns out it needs to be
306 : : * faster.
307 : : *
308 : : * We handle concurrency by taking an exclusive lock (allowing reads!)
309 : : * over the table for the duration of the search. Because we use a "dirty
310 : : * snapshot" we can read rows that other in-progress sessions have
311 : : * written, even though they would be invisible with normal snapshots. Due
312 : : * to the exclusive lock there's no danger that new rows can appear while
313 : : * we're checking.
314 : : */
315 : 405 : InitDirtySnapshot(SnapshotDirty);
316 : :
2661 317 : 405 : rel = table_open(ReplicationOriginRelationId, ExclusiveLock);
318 : :
319 : : /*
320 : : * We want to be able to access pg_replication_origin without setting up a
321 : : * snapshot. To make that safe, it needs to not have a TOAST table, since
322 : : * TOASTed data cannot be fetched without a snapshot. As of this writing,
323 : : * its only varlena column is roname, which we limit to 512 bytes to avoid
324 : : * needing out-of-line storage. If you add a TOAST table to this catalog,
325 : : * be sure to set up a snapshot everywhere it might be needed. For more
326 : : * information, see https://postgr.es/m/ZvMSUPOqUU-VNADN%40nathan.
327 : : */
363 nathan@postgresql.or 328 [ - + ]: 405 : Assert(!OidIsValid(rel->rd_rel->reltoastrelid));
329 : :
4023 andres@anarazel.de 330 [ + - ]: 728 : for (roident = InvalidOid + 1; roident < PG_UINT16_MAX; roident++)
331 : : {
332 : : bool nulls[Natts_pg_replication_origin];
333 : : Datum values[Natts_pg_replication_origin];
334 : : bool collides;
335 : :
4024 336 [ - + ]: 728 : CHECK_FOR_INTERRUPTS();
337 : :
338 : 728 : ScanKeyInit(&key,
339 : : Anum_pg_replication_origin_roident,
340 : : BTEqualStrategyNumber, F_OIDEQ,
341 : : ObjectIdGetDatum(roident));
342 : :
343 : 728 : scan = systable_beginscan(rel, ReplicationOriginIdentIndex,
344 : : true /* indexOK */ ,
345 : : &SnapshotDirty,
346 : : 1, &key);
347 : :
348 : 728 : collides = HeapTupleIsValid(systable_getnext(scan));
349 : :
350 : 728 : systable_endscan(scan);
351 : :
352 [ + + ]: 728 : if (!collides)
353 : : {
354 : : /*
355 : : * Ok, found an unused roident, insert the new row and do a CCI,
356 : : * so our callers can look it up if they want to.
357 : : */
358 : 405 : memset(&nulls, 0, sizeof(nulls));
359 : :
4000 bruce@momjian.us 360 : 405 : values[Anum_pg_replication_origin_roident - 1] = ObjectIdGetDatum(roident);
4024 andres@anarazel.de 361 : 405 : values[Anum_pg_replication_origin_roname - 1] = roname_d;
362 : :
363 : 405 : tuple = heap_form_tuple(RelationGetDescr(rel), values, nulls);
3381 alvherre@alvh.no-ip. 364 : 405 : CatalogTupleInsert(rel, tuple);
4024 andres@anarazel.de 365 : 404 : CommandCounterIncrement();
366 : 404 : break;
367 : : }
368 : : }
369 : :
370 : : /* now release lock again, */
2661 371 : 404 : table_close(rel, ExclusiveLock);
372 : :
4024 373 [ - + ]: 404 : if (tuple == NULL)
4024 andres@anarazel.de 374 [ # # ]:UBC 0 : ereport(ERROR,
375 : : (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
376 : : errmsg("could not find free replication origin ID")));
377 : :
4024 andres@anarazel.de 378 :CBC 404 : heap_freetuple(tuple);
379 : 404 : return roident;
380 : : }
381 : :
382 : : /*
383 : : * Helper function to drop a replication origin.
384 : : */
385 : : static void
97 msawada@postgresql.o 386 :GNC 351 : replorigin_state_clear(ReplOriginId roident, bool nowait)
387 : : {
388 : : int i;
389 : :
390 : : /*
391 : : * Clean up the slot state info, if there is any matching slot.
392 : : */
3192 alvherre@alvh.no-ip. 393 :CBC 355 : restart:
4024 andres@anarazel.de 394 : 355 : LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
395 : :
410 msawada@postgresql.o 396 [ + + ]: 1308 : for (i = 0; i < max_active_replication_origins; i++)
397 : : {
4024 andres@anarazel.de 398 : 1240 : ReplicationState *state = &replication_states[i];
399 : :
400 [ + + ]: 1240 : if (state->roident == roident)
401 : : {
402 : : /* found our slot, is it busy? */
111 akapila@postgresql.o 403 [ + + ]:GNC 287 : if (state->refcount > 0)
404 : : {
405 : : ConditionVariable *cv;
406 : :
3192 alvherre@alvh.no-ip. 407 [ - + ]:CBC 4 : if (nowait)
3192 alvherre@alvh.no-ip. 408 [ # # # # ]:UBC 0 : ereport(ERROR,
409 : : (errcode(ERRCODE_OBJECT_IN_USE),
410 : : (state->acquired_by != 0)
411 : : ? errmsg("could not drop replication origin with ID %d, in use by PID %d",
412 : : state->roident,
413 : : state->acquired_by)
414 : : : errmsg("could not drop replication origin with ID %d, in use by another process",
415 : : state->roident)));
416 : :
417 : : /*
418 : : * We must wait and then retry. Since we don't know which CV
419 : : * to wait on until here, we can't readily use
420 : : * ConditionVariablePrepareToSleep (calling it here would be
421 : : * wrong, since we could miss the signal if we did so); just
422 : : * use ConditionVariableSleep directly.
423 : : */
3192 alvherre@alvh.no-ip. 424 :CBC 4 : cv = &state->origin_cv;
425 : :
426 : 4 : LWLockRelease(ReplicationOriginLock);
427 : :
428 : 4 : ConditionVariableSleep(cv, WAIT_EVENT_REPLICATION_ORIGIN_DROP);
429 : 4 : goto restart;
430 : : }
431 : :
432 : : /* first make a WAL log entry */
433 : : {
434 : : xl_replorigin_drop xlrec;
435 : :
4024 andres@anarazel.de 436 : 283 : xlrec.node_id = roident;
437 : 283 : XLogBeginInsert();
448 peter@eisentraut.org 438 : 283 : XLogRegisterData(&xlrec, sizeof(xlrec));
4024 andres@anarazel.de 439 : 283 : XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_DROP);
440 : : }
441 : :
442 : : /* then clear the in-memory slot */
97 msawada@postgresql.o 443 :GNC 283 : state->roident = InvalidReplOriginId;
4024 andres@anarazel.de 444 :CBC 283 : state->remote_lsn = InvalidXLogRecPtr;
445 : 283 : state->local_lsn = InvalidXLogRecPtr;
446 : 283 : break;
447 : : }
448 : : }
449 : 351 : LWLockRelease(ReplicationOriginLock);
3038 tgl@sss.pgh.pa.us 450 : 351 : ConditionVariableCancelSleep();
4024 andres@anarazel.de 451 : 351 : }
452 : :
453 : : /*
454 : : * Drop replication origin (by name).
455 : : *
456 : : * Needs to be called in a transaction.
457 : : */
458 : : void
1790 peter@eisentraut.org 459 : 550 : replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait)
460 : : {
461 : : ReplOriginId roident;
462 : : Relation rel;
463 : : HeapTuple tuple;
464 : :
1910 akapila@postgresql.o 465 [ - + ]: 550 : Assert(IsTransactionState());
466 : :
1187 467 : 550 : rel = table_open(ReplicationOriginRelationId, RowExclusiveLock);
468 : :
1910 469 : 550 : roident = replorigin_by_name(name, missing_ok);
470 : :
471 : : /* Lock the origin to prevent concurrent drops. */
1187 472 : 549 : LockSharedObject(ReplicationOriginRelationId, roident, 0,
473 : : AccessExclusiveLock);
474 : :
475 : 549 : tuple = SearchSysCache1(REPLORIGIDENT, ObjectIdGetDatum(roident));
476 [ + + ]: 549 : if (!HeapTupleIsValid(tuple))
477 : : {
478 [ - + ]: 198 : if (!missing_ok)
1187 akapila@postgresql.o 479 [ # # ]:UBC 0 : elog(ERROR, "cache lookup failed for replication origin with ID %d",
480 : : roident);
481 : :
482 : : /*
483 : : * We don't need to retain the locks if the origin is already dropped.
484 : : */
1187 akapila@postgresql.o 485 :CBC 198 : UnlockSharedObject(ReplicationOriginRelationId, roident, 0,
486 : : AccessExclusiveLock);
487 : 198 : table_close(rel, RowExclusiveLock);
488 : 198 : return;
489 : : }
490 : :
491 : 351 : replorigin_state_clear(roident, nowait);
492 : :
493 : : /*
494 : : * Now, we can delete the catalog entry.
495 : : */
496 : 351 : CatalogTupleDelete(rel, &tuple->t_self);
497 : 351 : ReleaseSysCache(tuple);
498 : :
499 : 351 : CommandCounterIncrement();
500 : :
501 : : /* We keep the lock on pg_replication_origin until commit */
1910 502 : 351 : table_close(rel, NoLock);
503 : : }
504 : :
505 : : /*
506 : : * Lookup replication origin via its oid and return the name.
507 : : *
508 : : * The external name is palloc'd in the calling context.
509 : : *
510 : : * Returns true if the origin is known, false otherwise.
511 : : */
512 : : bool
97 msawada@postgresql.o 513 :GNC 29 : replorigin_by_oid(ReplOriginId roident, bool missing_ok, char **roname)
514 : : {
515 : : HeapTuple tuple;
516 : : Form_pg_replication_origin ric;
517 : :
4024 andres@anarazel.de 518 [ - + ]:CBC 29 : Assert(OidIsValid((Oid) roident));
97 msawada@postgresql.o 519 [ - + ]:GNC 29 : Assert(roident != InvalidReplOriginId);
4024 andres@anarazel.de 520 [ - + ]:CBC 29 : Assert(roident != DoNotReplicateId);
521 : :
522 : 29 : tuple = SearchSysCache1(REPLORIGIDENT,
523 : : ObjectIdGetDatum((Oid) roident));
524 : :
525 [ + + ]: 29 : if (HeapTupleIsValid(tuple))
526 : : {
527 : 26 : ric = (Form_pg_replication_origin) GETSTRUCT(tuple);
528 : 26 : *roname = text_to_cstring(&ric->roname);
529 : 26 : ReleaseSysCache(tuple);
530 : :
531 : 26 : return true;
532 : : }
533 : : else
534 : : {
535 : 3 : *roname = NULL;
536 : :
537 [ - + ]: 3 : if (!missing_ok)
3134 rhaas@postgresql.org 538 [ # # ]:UBC 0 : ereport(ERROR,
539 : : (errcode(ERRCODE_UNDEFINED_OBJECT),
540 : : errmsg("replication origin with ID %d does not exist",
541 : : roident)));
542 : :
4024 andres@anarazel.de 543 :CBC 3 : return false;
544 : : }
545 : : }
546 : :
547 : :
548 : : /* ---------------------------------------------------------------------------
549 : : * Functions for handling replication progress.
550 : : * ---------------------------------------------------------------------------
551 : : */
552 : :
553 : : static void
29 heikki.linnakangas@i 554 :GNC 1244 : ReplicationOriginShmemRequest(void *arg)
555 : : {
4024 andres@anarazel.de 556 :CBC 1244 : Size size = 0;
557 : :
410 msawada@postgresql.o 558 [ + + ]: 1244 : if (max_active_replication_origins == 0)
29 heikki.linnakangas@i 559 :GNC 1 : return;
560 : :
4024 andres@anarazel.de 561 :CBC 1243 : size = add_size(size, offsetof(ReplicationStateCtl, states));
562 : 1243 : size = add_size(size,
563 : : mul_size(max_active_replication_origins, sizeof(ReplicationState)));
29 heikki.linnakangas@i 564 :GNC 1243 : ShmemRequestStruct(.name = "ReplicationOriginState",
565 : : .size = size,
566 : : .ptr = (void **) &replication_states_ctl,
567 : : );
568 : : }
569 : :
570 : : static void
571 : 1241 : ReplicationOriginShmemInit(void *arg)
572 : : {
410 msawada@postgresql.o 573 [ + + ]:CBC 1241 : if (max_active_replication_origins == 0)
4024 andres@anarazel.de 574 : 1 : return;
575 : :
4000 bruce@momjian.us 576 : 1240 : replication_states = replication_states_ctl->states;
577 : :
29 heikki.linnakangas@i 578 :GNC 1240 : replication_states_ctl->tranche_id = LWTRANCHE_REPLICATION_ORIGIN_STATE;
579 : :
580 [ + + ]: 13631 : for (int i = 0; i < max_active_replication_origins; i++)
581 : : {
582 : 12391 : LWLockInitialize(&replication_states[i].lock,
583 : 12391 : replication_states_ctl->tranche_id);
584 : 12391 : ConditionVariableInit(&replication_states[i].origin_cv);
585 : : }
586 : : }
587 : :
588 : : static void
29 heikki.linnakangas@i 589 :UNC 0 : ReplicationOriginShmemAttach(void *arg)
590 : : {
591 [ # # ]: 0 : if (max_active_replication_origins == 0)
592 : 0 : return;
593 : :
594 : 0 : replication_states = replication_states_ctl->states;
595 : : }
596 : :
597 : : /* ---------------------------------------------------------------------------
598 : : * Perform a checkpoint of each replication origin's progress with respect to
599 : : * the replayed remote_lsn. Make sure that all transactions we refer to in the
600 : : * checkpoint (local_lsn) are actually on-disk. This might not yet be the case
601 : : * if the transactions were originally committed asynchronously.
602 : : *
603 : : * We store checkpoints in the following format:
604 : : * +-------+------------------------+------------------+-----+--------+
605 : : * | MAGIC | ReplicationStateOnDisk | struct Replic... | ... | CRC32C | EOF
606 : : * +-------+------------------------+------------------+-----+--------+
607 : : *
608 : : * So its just the magic, followed by the statically sized
609 : : * ReplicationStateOnDisk structs. Note that the maximum number of
610 : : * ReplicationState is determined by max_active_replication_origins.
611 : : * ---------------------------------------------------------------------------
612 : : */
613 : : void
4024 andres@anarazel.de 614 :CBC 1944 : CheckPointReplicationOrigin(void)
615 : : {
613 michael@paquier.xyz 616 : 1944 : const char *tmppath = PG_REPLORIGIN_CHECKPOINT_TMPFILE;
617 : 1944 : const char *path = PG_REPLORIGIN_CHECKPOINT_FILENAME;
618 : : int tmpfd;
619 : : int i;
4024 andres@anarazel.de 620 : 1944 : uint32 magic = REPLICATION_STATE_MAGIC;
621 : : pg_crc32c crc;
622 : :
410 msawada@postgresql.o 623 [ + + ]: 1944 : if (max_active_replication_origins == 0)
4024 andres@anarazel.de 624 : 1 : return;
625 : :
626 : 1943 : INIT_CRC32C(crc);
627 : :
628 : : /* make sure no old temp file is remaining */
629 [ + - - + ]: 1943 : if (unlink(tmppath) < 0 && errno != ENOENT)
4024 andres@anarazel.de 630 [ # # ]:UBC 0 : ereport(PANIC,
631 : : (errcode_for_file_access(),
632 : : errmsg("could not remove file \"%s\": %m",
633 : : tmppath)));
634 : :
635 : : /*
636 : : * no other backend can perform this at the same time; only one checkpoint
637 : : * can happen at a time.
638 : : */
3146 peter_e@gmx.net 639 :CBC 1943 : tmpfd = OpenTransientFile(tmppath,
640 : : O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
4024 andres@anarazel.de 641 [ - + ]: 1943 : if (tmpfd < 0)
4024 andres@anarazel.de 642 [ # # ]:UBC 0 : ereport(PANIC,
643 : : (errcode_for_file_access(),
644 : : errmsg("could not create file \"%s\": %m",
645 : : tmppath)));
646 : :
647 : : /* write magic */
2830 michael@paquier.xyz 648 :CBC 1943 : errno = 0;
4024 andres@anarazel.de 649 [ - + ]: 1943 : if ((write(tmpfd, &magic, sizeof(magic))) != sizeof(magic))
650 : : {
651 : : /* if write didn't set errno, assume problem is no disk space */
2575 michael@paquier.xyz 652 [ # # ]:UBC 0 : if (errno == 0)
653 : 0 : errno = ENOSPC;
4024 andres@anarazel.de 654 [ # # ]: 0 : ereport(PANIC,
655 : : (errcode_for_file_access(),
656 : : errmsg("could not write to file \"%s\": %m",
657 : : tmppath)));
658 : : }
4024 andres@anarazel.de 659 :CBC 1943 : COMP_CRC32C(crc, &magic, sizeof(magic));
660 : :
661 : : /* prevent concurrent creations/drops */
662 : 1943 : LWLockAcquire(ReplicationOriginLock, LW_SHARED);
663 : :
664 : : /* write actual data */
410 msawada@postgresql.o 665 [ + + ]: 21373 : for (i = 0; i < max_active_replication_origins; i++)
666 : : {
667 : : ReplicationStateOnDisk disk_state;
4024 andres@anarazel.de 668 : 19430 : ReplicationState *curstate = &replication_states[i];
669 : : XLogRecPtr local_lsn;
670 : :
97 msawada@postgresql.o 671 [ + + ]:GNC 19430 : if (curstate->roident == InvalidReplOriginId)
4024 andres@anarazel.de 672 :CBC 19376 : continue;
673 : :
674 : : /* zero, to avoid uninitialized padding bytes */
3299 675 : 54 : memset(&disk_state, 0, sizeof(disk_state));
676 : :
4024 677 : 54 : LWLockAcquire(&curstate->lock, LW_SHARED);
678 : :
679 : 54 : disk_state.roident = curstate->roident;
680 : :
681 : 54 : disk_state.remote_lsn = curstate->remote_lsn;
682 : 54 : local_lsn = curstate->local_lsn;
683 : :
684 : 54 : LWLockRelease(&curstate->lock);
685 : :
686 : : /* make sure we only write out a commit that's persistent */
687 : 54 : XLogFlush(local_lsn);
688 : :
2830 michael@paquier.xyz 689 : 54 : errno = 0;
4024 andres@anarazel.de 690 [ - + ]: 54 : if ((write(tmpfd, &disk_state, sizeof(disk_state))) !=
691 : : sizeof(disk_state))
692 : : {
693 : : /* if write didn't set errno, assume problem is no disk space */
2575 michael@paquier.xyz 694 [ # # ]:UBC 0 : if (errno == 0)
695 : 0 : errno = ENOSPC;
4024 andres@anarazel.de 696 [ # # ]: 0 : ereport(PANIC,
697 : : (errcode_for_file_access(),
698 : : errmsg("could not write to file \"%s\": %m",
699 : : tmppath)));
700 : : }
701 : :
4024 andres@anarazel.de 702 :CBC 54 : COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
703 : : }
704 : :
705 : 1943 : LWLockRelease(ReplicationOriginLock);
706 : :
707 : : /* write out the CRC */
708 : 1943 : FIN_CRC32C(crc);
2830 michael@paquier.xyz 709 : 1943 : errno = 0;
4024 andres@anarazel.de 710 [ - + ]: 1943 : if ((write(tmpfd, &crc, sizeof(crc))) != sizeof(crc))
711 : : {
712 : : /* if write didn't set errno, assume problem is no disk space */
2575 michael@paquier.xyz 713 [ # # ]:UBC 0 : if (errno == 0)
714 : 0 : errno = ENOSPC;
4024 andres@anarazel.de 715 [ # # ]: 0 : ereport(PANIC,
716 : : (errcode_for_file_access(),
717 : : errmsg("could not write to file \"%s\": %m",
718 : : tmppath)));
719 : : }
720 : :
2495 peter@eisentraut.org 721 [ - + ]:CBC 1943 : if (CloseTransientFile(tmpfd) != 0)
2614 michael@paquier.xyz 722 [ # # ]:UBC 0 : ereport(PANIC,
723 : : (errcode_for_file_access(),
724 : : errmsg("could not close file \"%s\": %m",
725 : : tmppath)));
726 : :
727 : : /* fsync, rename to permanent file, fsync file and directory */
3709 andres@anarazel.de 728 :CBC 1943 : durable_rename(tmppath, path, PANIC);
729 : : }
730 : :
731 : : /*
732 : : * Recover replication replay status from checkpoint data saved earlier by
733 : : * CheckPointReplicationOrigin.
734 : : *
735 : : * This only needs to be called at startup and *not* during every checkpoint
736 : : * read during recovery (e.g. in HS or PITR from a base backup) afterwards. All
737 : : * state thereafter can be recovered by looking at commit records.
738 : : */
739 : : void
4024 740 : 1077 : StartupReplicationOrigin(void)
741 : : {
613 michael@paquier.xyz 742 : 1077 : const char *path = PG_REPLORIGIN_CHECKPOINT_FILENAME;
743 : : int fd;
744 : : int readBytes;
4000 bruce@momjian.us 745 : 1077 : uint32 magic = REPLICATION_STATE_MAGIC;
746 : 1077 : int last_state = 0;
747 : : pg_crc32c file_crc;
748 : : pg_crc32c crc;
749 : :
750 : : /* don't want to overwrite already existing state */
751 : : #ifdef USE_ASSERT_CHECKING
752 : : static bool already_started = false;
753 : :
4024 andres@anarazel.de 754 [ - + ]: 1077 : Assert(!already_started);
755 : 1077 : already_started = true;
756 : : #endif
757 : :
410 msawada@postgresql.o 758 [ + + ]: 1077 : if (max_active_replication_origins == 0)
4024 andres@anarazel.de 759 : 58 : return;
760 : :
761 : 1076 : INIT_CRC32C(crc);
762 : :
763 [ + + ]: 1076 : elog(DEBUG2, "starting up replication origin progress state");
764 : :
3146 peter_e@gmx.net 765 : 1076 : fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
766 : :
767 : : /*
768 : : * might have had max_active_replication_origins == 0 last run, or we just
769 : : * brought up a standby.
770 : : */
4024 andres@anarazel.de 771 [ + + + - ]: 1076 : if (fd < 0 && errno == ENOENT)
772 : 57 : return;
773 [ - + ]: 1019 : else if (fd < 0)
4024 andres@anarazel.de 774 [ # # ]:UBC 0 : ereport(PANIC,
775 : : (errcode_for_file_access(),
776 : : errmsg("could not open file \"%s\": %m",
777 : : path)));
778 : :
779 : : /* verify magic, that is written even if nothing was active */
4024 andres@anarazel.de 780 :CBC 1019 : readBytes = read(fd, &magic, sizeof(magic));
781 [ - + ]: 1019 : if (readBytes != sizeof(magic))
782 : : {
2848 michael@paquier.xyz 783 [ # # ]:UBC 0 : if (readBytes < 0)
784 [ # # ]: 0 : ereport(PANIC,
785 : : (errcode_for_file_access(),
786 : : errmsg("could not read file \"%s\": %m",
787 : : path)));
788 : : else
789 [ # # ]: 0 : ereport(PANIC,
790 : : (errcode(ERRCODE_DATA_CORRUPTED),
791 : : errmsg("could not read file \"%s\": read %d of %zu",
792 : : path, readBytes, sizeof(magic))));
793 : : }
4024 andres@anarazel.de 794 :CBC 1019 : COMP_CRC32C(crc, &magic, sizeof(magic));
795 : :
796 [ - + ]: 1019 : if (magic != REPLICATION_STATE_MAGIC)
4024 andres@anarazel.de 797 [ # # ]:UBC 0 : ereport(PANIC,
798 : : (errmsg("replication checkpoint has wrong magic %u instead of %u",
799 : : magic, REPLICATION_STATE_MAGIC)));
800 : :
801 : : /* we can skip locking here, no other access is possible */
802 : :
803 : : /* recover individual states, until there are no more to be found */
804 : : while (true)
4024 andres@anarazel.de 805 :CBC 31 : {
806 : : ReplicationStateOnDisk disk_state;
807 : :
808 : 1050 : readBytes = read(fd, &disk_state, sizeof(disk_state));
809 : :
810 [ - + ]: 1050 : if (readBytes < 0)
811 : : {
4024 andres@anarazel.de 812 [ # # ]:UBC 0 : ereport(PANIC,
813 : : (errcode_for_file_access(),
814 : : errmsg("could not read file \"%s\": %m",
815 : : path)));
816 : : }
817 : :
818 : : /* no further data */
155 peter@eisentraut.org 819 [ + + ]:GNC 1050 : if (readBytes == sizeof(crc))
820 : : {
821 : 1019 : memcpy(&file_crc, &disk_state, sizeof(file_crc));
822 : 1019 : break;
823 : : }
824 : :
4024 andres@anarazel.de 825 [ - + ]:CBC 31 : if (readBytes != sizeof(disk_state))
826 : : {
4024 andres@anarazel.de 827 [ # # ]:UBC 0 : ereport(PANIC,
828 : : (errcode_for_file_access(),
829 : : errmsg("could not read file \"%s\": read %d of %zu",
830 : : path, readBytes, sizeof(disk_state))));
831 : : }
832 : :
4024 andres@anarazel.de 833 :CBC 31 : COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
834 : :
410 msawada@postgresql.o 835 [ - + ]: 31 : if (last_state == max_active_replication_origins)
4024 andres@anarazel.de 836 [ # # ]:UBC 0 : ereport(PANIC,
837 : : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
838 : : errmsg("could not find free replication state, increase \"max_active_replication_origins\"")));
839 : :
840 : : /* copy data to shared memory */
4024 andres@anarazel.de 841 :CBC 31 : replication_states[last_state].roident = disk_state.roident;
842 : 31 : replication_states[last_state].remote_lsn = disk_state.remote_lsn;
843 : 31 : last_state++;
844 : :
1978 peter@eisentraut.org 845 [ + - ]: 31 : ereport(LOG,
846 : : errmsg("recovered replication state of node %d to %X/%08X",
847 : : disk_state.roident,
848 : : LSN_FORMAT_ARGS(disk_state.remote_lsn)));
849 : : }
850 : :
851 : : /* now check checksum */
4024 andres@anarazel.de 852 : 1019 : FIN_CRC32C(crc);
853 [ - + ]: 1019 : if (file_crc != crc)
4024 andres@anarazel.de 854 [ # # ]:UBC 0 : ereport(PANIC,
855 : : (errcode(ERRCODE_DATA_CORRUPTED),
856 : : errmsg("replication slot checkpoint has wrong checksum %u, expected %u",
857 : : crc, file_crc)));
858 : :
2495 peter@eisentraut.org 859 [ - + ]:CBC 1019 : if (CloseTransientFile(fd) != 0)
2614 michael@paquier.xyz 860 [ # # ]:UBC 0 : ereport(PANIC,
861 : : (errcode_for_file_access(),
862 : : errmsg("could not close file \"%s\": %m",
863 : : path)));
864 : : }
865 : :
866 : : void
4024 andres@anarazel.de 867 :CBC 4 : replorigin_redo(XLogReaderState *record)
868 : : {
869 : 4 : uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
870 : :
871 [ + + - ]: 4 : switch (info)
872 : : {
873 : 2 : case XLOG_REPLORIGIN_SET:
874 : : {
875 : 2 : xl_replorigin_set *xlrec =
1082 tgl@sss.pgh.pa.us 876 : 2 : (xl_replorigin_set *) XLogRecGetData(record);
877 : :
4024 andres@anarazel.de 878 : 2 : replorigin_advance(xlrec->node_id,
879 : : xlrec->remote_lsn, record->EndRecPtr,
4000 bruce@momjian.us 880 : 2 : xlrec->force /* backward */ ,
881 : : false /* WAL log */ );
4024 andres@anarazel.de 882 : 2 : break;
883 : : }
884 : 2 : case XLOG_REPLORIGIN_DROP:
885 : : {
886 : : xl_replorigin_drop *xlrec;
887 : : int i;
888 : :
889 : 2 : xlrec = (xl_replorigin_drop *) XLogRecGetData(record);
890 : :
410 msawada@postgresql.o 891 [ + - ]: 3 : for (i = 0; i < max_active_replication_origins; i++)
892 : : {
4024 andres@anarazel.de 893 : 3 : ReplicationState *state = &replication_states[i];
894 : :
895 : : /* found our slot */
896 [ + + ]: 3 : if (state->roident == xlrec->node_id)
897 : : {
898 : : /* reset entry */
97 msawada@postgresql.o 899 :GNC 2 : state->roident = InvalidReplOriginId;
4024 andres@anarazel.de 900 :CBC 2 : state->remote_lsn = InvalidXLogRecPtr;
901 : 2 : state->local_lsn = InvalidXLogRecPtr;
902 : 2 : break;
903 : : }
904 : : }
905 : 2 : break;
906 : : }
4024 andres@anarazel.de 907 :UBC 0 : default:
908 [ # # ]: 0 : elog(PANIC, "replorigin_redo: unknown op code %u", info);
909 : : }
4024 andres@anarazel.de 910 :CBC 4 : }
911 : :
912 : :
913 : : /*
914 : : * Tell the replication origin progress machinery that a commit from 'node'
915 : : * that originated at the LSN remote_commit on the remote node was replayed
916 : : * successfully and that we don't need to do so again. In combination with
917 : : * setting up replorigin_xact_state {.origin_lsn, .origin_timestamp}
918 : : * that ensures we won't lose knowledge about that after a crash if the
919 : : * transaction had a persistent effect (think of asynchronous commits).
920 : : *
921 : : * local_commit needs to be a local LSN of the commit so that we can make sure
922 : : * upon a checkpoint that enough WAL has been persisted to disk.
923 : : *
924 : : * Needs to be called with a RowExclusiveLock on pg_replication_origin,
925 : : * unless running in recovery.
926 : : */
927 : : void
97 msawada@postgresql.o 928 :GNC 255 : replorigin_advance(ReplOriginId node,
929 : : XLogRecPtr remote_commit, XLogRecPtr local_commit,
930 : : bool go_backward, bool wal_log)
931 : : {
932 : : int i;
4024 andres@anarazel.de 933 :CBC 255 : ReplicationState *replication_state = NULL;
934 : 255 : ReplicationState *free_state = NULL;
935 : :
97 msawada@postgresql.o 936 [ - + ]:GNC 255 : Assert(node != InvalidReplOriginId);
937 : :
938 : : /* we don't track DoNotReplicateId */
4024 andres@anarazel.de 939 [ - + ]:CBC 255 : if (node == DoNotReplicateId)
4024 andres@anarazel.de 940 :UBC 0 : return;
941 : :
942 : : /*
943 : : * XXX: For the case where this is called by WAL replay, it'd be more
944 : : * efficient to restore into a backend local hashtable and only dump into
945 : : * shmem after recovery is finished. Let's wait with implementing that
946 : : * till it's shown to be a measurable expense
947 : : */
948 : :
949 : : /* Lock exclusively, as we may have to create a new table entry. */
4024 andres@anarazel.de 950 :CBC 255 : LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
951 : :
952 : : /*
953 : : * Search for either an existing slot for the origin, or a free one we can
954 : : * use.
955 : : */
410 msawada@postgresql.o 956 [ + + ]: 2349 : for (i = 0; i < max_active_replication_origins; i++)
957 : : {
4024 andres@anarazel.de 958 : 2141 : ReplicationState *curstate = &replication_states[i];
959 : :
960 : : /* remember where to insert if necessary */
97 msawada@postgresql.o 961 [ + + + + ]:GNC 2141 : if (curstate->roident == InvalidReplOriginId &&
962 : : free_state == NULL)
963 : : {
4024 andres@anarazel.de 964 :CBC 209 : free_state = curstate;
965 : 209 : continue;
966 : : }
967 : :
968 : : /* not our slot */
969 [ + + ]: 1932 : if (curstate->roident != node)
970 : : {
971 : 1885 : continue;
972 : : }
973 : :
974 : : /* ok, found slot */
975 : 47 : replication_state = curstate;
976 : :
977 : 47 : LWLockAcquire(&replication_state->lock, LW_EXCLUSIVE);
978 : :
979 : : /* Make sure it's not used by somebody else */
111 akapila@postgresql.o 980 [ - + ]:GNC 47 : if (replication_state->refcount > 0)
981 : : {
4024 andres@anarazel.de 982 [ # # # # ]:UBC 0 : ereport(ERROR,
983 : : (errcode(ERRCODE_OBJECT_IN_USE),
984 : : (replication_state->acquired_by != 0)
985 : : ? errmsg("replication origin with ID %d is already active for PID %d",
986 : : replication_state->roident,
987 : : replication_state->acquired_by)
988 : : : errmsg("replication origin with ID %d is already active in another process",
989 : : replication_state->roident)));
990 : : }
991 : :
4024 andres@anarazel.de 992 :CBC 47 : break;
993 : : }
994 : :
995 [ + + - + ]: 255 : if (replication_state == NULL && free_state == NULL)
4024 andres@anarazel.de 996 [ # # ]:UBC 0 : ereport(ERROR,
997 : : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
998 : : errmsg("could not find free replication state slot for replication origin with ID %d",
999 : : node),
1000 : : errhint("Increase \"max_active_replication_origins\" and try again.")));
1001 : :
4024 andres@anarazel.de 1002 [ + + ]:CBC 255 : if (replication_state == NULL)
1003 : : {
1004 : : /* initialize new slot */
1005 : 208 : LWLockAcquire(&free_state->lock, LW_EXCLUSIVE);
1006 : 208 : replication_state = free_state;
180 alvherre@kurilemu.de 1007 [ - + ]:GNC 208 : Assert(!XLogRecPtrIsValid(replication_state->remote_lsn));
1008 [ - + ]: 208 : Assert(!XLogRecPtrIsValid(replication_state->local_lsn));
4024 andres@anarazel.de 1009 :CBC 208 : replication_state->roident = node;
1010 : : }
1011 : :
97 msawada@postgresql.o 1012 [ - + ]:GNC 255 : Assert(replication_state->roident != InvalidReplOriginId);
1013 : :
1014 : : /*
1015 : : * If somebody "forcefully" sets this slot, WAL log it, so it's durable
1016 : : * and the standby gets the message. Primarily this will be called during
1017 : : * WAL replay (of commit records) where no WAL logging is necessary.
1018 : : */
4024 andres@anarazel.de 1019 [ + + ]:CBC 255 : if (wal_log)
1020 : : {
1021 : : xl_replorigin_set xlrec;
1022 : :
1023 : 214 : xlrec.remote_lsn = remote_commit;
1024 : 214 : xlrec.node_id = node;
1025 : 214 : xlrec.force = go_backward;
1026 : :
1027 : 214 : XLogBeginInsert();
448 peter@eisentraut.org 1028 : 214 : XLogRegisterData(&xlrec, sizeof(xlrec));
1029 : :
4024 andres@anarazel.de 1030 : 214 : XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_SET);
1031 : : }
1032 : :
1033 : : /*
1034 : : * Due to - harmless - race conditions during a checkpoint we could see
1035 : : * values here that are older than the ones we already have in memory. We
1036 : : * could also see older values for prepared transactions when the prepare
1037 : : * is sent at a later point of time along with commit prepared and there
1038 : : * are other transactions commits between prepare and commit prepared. See
1039 : : * ReorderBufferFinishPrepared. Don't overwrite those.
1040 : : */
1041 [ + + + + ]: 255 : if (go_backward || replication_state->remote_lsn < remote_commit)
1042 : 248 : replication_state->remote_lsn = remote_commit;
180 alvherre@kurilemu.de 1043 [ + + + + ]:GNC 255 : if (XLogRecPtrIsValid(local_commit) &&
4024 andres@anarazel.de 1044 [ + - ]:CBC 38 : (go_backward || replication_state->local_lsn < local_commit))
1045 : 40 : replication_state->local_lsn = local_commit;
1046 : 255 : LWLockRelease(&replication_state->lock);
1047 : :
1048 : : /*
1049 : : * Release *after* changing the LSNs, slot isn't acquired and thus could
1050 : : * otherwise be dropped anytime.
1051 : : */
1052 : 255 : LWLockRelease(ReplicationOriginLock);
1053 : : }
1054 : :
1055 : :
1056 : : XLogRecPtr
97 msawada@postgresql.o 1057 :GNC 9 : replorigin_get_progress(ReplOriginId node, bool flush)
1058 : : {
1059 : : int i;
4024 andres@anarazel.de 1060 :CBC 9 : XLogRecPtr local_lsn = InvalidXLogRecPtr;
1061 : 9 : XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1062 : :
1063 : : /* prevent slots from being concurrently dropped */
1064 : 9 : LWLockAcquire(ReplicationOriginLock, LW_SHARED);
1065 : :
410 msawada@postgresql.o 1066 [ + + ]: 49 : for (i = 0; i < max_active_replication_origins; i++)
1067 : : {
1068 : : ReplicationState *state;
1069 : :
4024 andres@anarazel.de 1070 : 45 : state = &replication_states[i];
1071 : :
1072 [ + + ]: 45 : if (state->roident == node)
1073 : : {
1074 : 5 : LWLockAcquire(&state->lock, LW_SHARED);
1075 : :
1076 : 5 : remote_lsn = state->remote_lsn;
1077 : 5 : local_lsn = state->local_lsn;
1078 : :
1079 : 5 : LWLockRelease(&state->lock);
1080 : :
1081 : 5 : break;
1082 : : }
1083 : : }
1084 : :
1085 : 9 : LWLockRelease(ReplicationOriginLock);
1086 : :
180 alvherre@kurilemu.de 1087 [ + + + - ]:GNC 9 : if (flush && XLogRecPtrIsValid(local_lsn))
4024 andres@anarazel.de 1088 :CBC 1 : XLogFlush(local_lsn);
1089 : :
1090 : 9 : return remote_lsn;
1091 : : }
1092 : :
1093 : : /* Helper function to reset the session replication origin */
1094 : : static void
111 akapila@postgresql.o 1095 :GNC 516 : replorigin_session_reset_internal(void)
1096 : : {
1097 : : ConditionVariable *cv;
1098 : :
1099 [ - + ]: 516 : Assert(session_replication_state != NULL);
1100 : :
4024 andres@anarazel.de 1101 :CBC 516 : LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1102 : :
1103 : : /* The origin must be held by at least one process at this point. */
111 akapila@postgresql.o 1104 [ - + ]:GNC 516 : Assert(session_replication_state->refcount > 0);
1105 : :
1106 : : /*
1107 : : * Reset the PID only if the current session is the first to set up this
1108 : : * origin. This avoids clearing the first process's PID when any other
1109 : : * session releases the origin.
1110 : : */
1111 [ + + ]: 516 : if (session_replication_state->acquired_by == MyProcPid)
4024 andres@anarazel.de 1112 :CBC 502 : session_replication_state->acquired_by = 0;
1113 : :
111 akapila@postgresql.o 1114 :GNC 516 : session_replication_state->refcount--;
1115 : :
1116 : 516 : cv = &session_replication_state->origin_cv;
1117 : 516 : session_replication_state = NULL;
1118 : :
4024 andres@anarazel.de 1119 :CBC 516 : LWLockRelease(ReplicationOriginLock);
1120 : :
111 akapila@postgresql.o 1121 :GNC 516 : ConditionVariableBroadcast(cv);
1122 : 516 : }
1123 : :
1124 : : /*
1125 : : * Tear down a (possibly) configured session replication origin during process
1126 : : * exit.
1127 : : */
1128 : : static void
1129 : 512 : ReplicationOriginExitCleanup(int code, Datum arg)
1130 : : {
1131 [ + + ]: 512 : if (session_replication_state == NULL)
1132 : 202 : return;
1133 : :
1134 : 310 : replorigin_session_reset_internal();
1135 : : }
1136 : :
1137 : : /*
1138 : : * Setup a replication origin in the shared memory struct if it doesn't
1139 : : * already exist and cache access to the specific ReplicationSlot so the
1140 : : * array doesn't have to be searched when calling
1141 : : * replorigin_session_advance().
1142 : : *
1143 : : * Normally only one such cached origin can exist per process so the cached
1144 : : * value can only be set again after the previous value is torn down with
1145 : : * replorigin_session_reset(). For this normal case pass acquired_by = 0
1146 : : * (meaning the slot is not allowed to be already acquired by another process).
1147 : : *
1148 : : * However, sometimes multiple processes can safely re-use the same origin slot
1149 : : * (for example, multiple parallel apply processes can safely use the same
1150 : : * origin, provided they maintain commit order by allowing only one process to
1151 : : * commit at a time). For this case the first process must pass acquired_by =
1152 : : * 0, and then the other processes sharing that same origin can pass
1153 : : * acquired_by = PID of the first process.
1154 : : */
1155 : : void
97 msawada@postgresql.o 1156 : 518 : replorigin_session_setup(ReplOriginId node, int acquired_by)
1157 : : {
1158 : : static bool registered_cleanup;
1159 : : int i;
4000 bruce@momjian.us 1160 :CBC 518 : int free_slot = -1;
1161 : :
4024 andres@anarazel.de 1162 [ + + ]: 518 : if (!registered_cleanup)
1163 : : {
1164 : 512 : on_shmem_exit(ReplicationOriginExitCleanup, 0);
1165 : 512 : registered_cleanup = true;
1166 : : }
1167 : :
410 msawada@postgresql.o 1168 [ - + ]: 518 : Assert(max_active_replication_origins > 0);
1169 : :
4024 andres@anarazel.de 1170 [ + + ]: 518 : if (session_replication_state != NULL)
1171 [ + - ]: 1 : ereport(ERROR,
1172 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1173 : : errmsg("cannot setup replication origin when one is already setup")));
1174 : :
1175 : : /* Lock exclusively, as we may have to create a new table entry. */
1176 : 517 : LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1177 : :
1178 : : /*
1179 : : * Search for either an existing slot for the origin, or a free one we can
1180 : : * use.
1181 : : */
410 msawada@postgresql.o 1182 [ + + ]: 2077 : for (i = 0; i < max_active_replication_origins; i++)
1183 : : {
4024 andres@anarazel.de 1184 : 1951 : ReplicationState *curstate = &replication_states[i];
1185 : :
1186 : : /* remember where to insert if necessary */
97 msawada@postgresql.o 1187 [ + + + + ]:GNC 1951 : if (curstate->roident == InvalidReplOriginId &&
1188 : : free_slot == -1)
1189 : : {
4024 andres@anarazel.de 1190 :CBC 129 : free_slot = i;
1191 : 129 : continue;
1192 : : }
1193 : :
1194 : : /* not our slot */
1195 [ + + ]: 1822 : if (curstate->roident != node)
1196 : 1431 : continue;
1197 : :
40 akapila@postgresql.o 1198 [ + + ]:GNC 391 : if (acquired_by == 0)
1199 : : {
1200 : : /* With acquired_by == 0, we need the origin to be free */
1201 [ - + ]: 377 : if (curstate->acquired_by != 0)
1202 : : {
40 akapila@postgresql.o 1203 [ # # ]:UNC 0 : ereport(ERROR,
1204 : : (errcode(ERRCODE_OBJECT_IN_USE),
1205 : : errmsg("replication origin with ID %d is already active for PID %d",
1206 : : curstate->roident, curstate->acquired_by)));
1207 : : }
40 akapila@postgresql.o 1208 [ - + ]:GNC 377 : else if (curstate->refcount > 0)
1209 : : {
1210 : : /*
1211 : : * The origin is in use, but PID is not recorded. This can
1212 : : * happen if the process that originally acquired the origin
1213 : : * exited without releasing it. To ensure correctness, other
1214 : : * processes cannot acquire the origin until all processes
1215 : : * currently using it have released it.
1216 : : */
40 akapila@postgresql.o 1217 [ # # ]:UNC 0 : ereport(ERROR,
1218 : : (errcode(ERRCODE_OBJECT_IN_USE),
1219 : : errmsg("replication origin with ID %d is already active in another process",
1220 : : curstate->roident)));
1221 : : }
1222 : : }
1223 : : else
1224 : : {
1225 : : /*
1226 : : * With acquired_by != 0, we need the origin to be active by the
1227 : : * given PID
1228 : : */
40 akapila@postgresql.o 1229 [ - + ]:GNC 14 : if (curstate->acquired_by != acquired_by)
40 akapila@postgresql.o 1230 [ # # ]:UNC 0 : ereport(ERROR,
1231 : : (errcode(ERRCODE_OBJECT_IN_USE),
1232 : : errmsg("replication origin with ID %d is not active for PID %d",
1233 : : curstate->roident, acquired_by)));
1234 : :
1235 : : /*
1236 : : * Here, it is okay to have refcount > 0 as more than one process
1237 : : * can safely re-use the origin.
1238 : : */
1239 : : }
1240 : :
1241 : : /* ok, found slot */
4024 andres@anarazel.de 1242 :CBC 391 : session_replication_state = curstate;
895 akapila@postgresql.o 1243 : 391 : break;
1244 : : }
1245 : :
40 akapila@postgresql.o 1246 [ + + ]:GNC 517 : if (session_replication_state == NULL)
1247 : : {
1248 [ + + ]: 126 : if (acquired_by != 0)
228 1249 [ + - ]: 1 : ereport(ERROR,
1250 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1251 : : errmsg("cannot use PID %d for inactive replication origin with ID %d",
1252 : : acquired_by, node)));
1253 : :
1254 : : /* initialize new slot */
40 1255 [ - + ]: 125 : if (free_slot == -1)
40 akapila@postgresql.o 1256 [ # # ]:UNC 0 : ereport(ERROR,
1257 : : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
1258 : : errmsg("could not find free replication state slot for replication origin with ID %d",
1259 : : node),
1260 : : errhint("Increase \"max_active_replication_origins\" and try again.")));
1261 : :
4024 andres@anarazel.de 1262 :CBC 125 : session_replication_state = &replication_states[free_slot];
180 alvherre@kurilemu.de 1263 [ - + ]:GNC 125 : Assert(!XLogRecPtrIsValid(session_replication_state->remote_lsn));
1264 [ - + ]: 125 : Assert(!XLogRecPtrIsValid(session_replication_state->local_lsn));
4024 andres@anarazel.de 1265 :CBC 125 : session_replication_state->roident = node;
1266 : : }
1267 : :
1268 : :
97 msawada@postgresql.o 1269 [ - + ]:GNC 516 : Assert(session_replication_state->roident != InvalidReplOriginId);
1270 : :
1212 akapila@postgresql.o 1271 [ + + ]:CBC 516 : if (acquired_by == 0)
1272 : : {
1273 : 502 : session_replication_state->acquired_by = MyProcPid;
111 akapila@postgresql.o 1274 [ - + ]:GNC 502 : Assert(session_replication_state->refcount == 0);
1275 : : }
1276 : : else
1277 : : {
1278 : : /*
1279 : : * Sanity check: the origin must already be acquired by the process
1280 : : * passed as input, and at least one process must be using it.
1281 : : */
228 1282 [ - + ]: 14 : Assert(session_replication_state->acquired_by == acquired_by);
111 1283 [ - + ]: 14 : Assert(session_replication_state->refcount > 0);
1284 : : }
1285 : :
1286 : 516 : session_replication_state->refcount++;
1287 : :
4024 andres@anarazel.de 1288 :CBC 516 : LWLockRelease(ReplicationOriginLock);
1289 : :
1290 : : /* probably this one is pointless */
3192 alvherre@alvh.no-ip. 1291 : 516 : ConditionVariableBroadcast(&session_replication_state->origin_cv);
4024 andres@anarazel.de 1292 : 516 : }
1293 : :
1294 : : /*
1295 : : * Reset replay state previously setup in this session.
1296 : : *
1297 : : * This function may only be called if an origin was setup with
1298 : : * replorigin_session_setup().
1299 : : */
1300 : : void
1301 : 208 : replorigin_session_reset(void)
1302 : : {
410 msawada@postgresql.o 1303 [ - + ]: 208 : Assert(max_active_replication_origins != 0);
1304 : :
4024 andres@anarazel.de 1305 [ + + ]: 208 : if (session_replication_state == NULL)
1306 [ + - ]: 1 : ereport(ERROR,
1307 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1308 : : errmsg("no replication origin is configured")));
1309 : :
1310 : : /*
1311 : : * Restrict explicit resetting of the replication origin if it was first
1312 : : * acquired by this process and others are still using it. While the
1313 : : * system handles this safely (as happens if the first session exits
1314 : : * without calling reset), it is best to avoid doing so.
1315 : : */
111 akapila@postgresql.o 1316 [ + + ]:GNC 207 : if (session_replication_state->acquired_by == MyProcPid &&
1317 [ + + ]: 205 : session_replication_state->refcount > 1)
1318 [ + - ]: 1 : ereport(ERROR,
1319 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1320 : : errmsg("cannot reset replication origin with ID %d because it is still in use by other processes",
1321 : : session_replication_state->roident),
1322 : : errdetail("This session is the first process for this replication origin, and other processes are currently sharing it."),
1323 : : errhint("Reset the replication origin in all other processes before retrying.")));
1324 : :
1325 : 206 : replorigin_session_reset_internal();
4024 andres@anarazel.de 1326 :CBC 206 : }
1327 : :
1328 : : /*
1329 : : * Do the same work replorigin_advance() does, just on the session's
1330 : : * configured origin.
1331 : : *
1332 : : * This is noticeably cheaper than using replorigin_advance().
1333 : : */
1334 : : void
1335 : 1161 : replorigin_session_advance(XLogRecPtr remote_commit, XLogRecPtr local_commit)
1336 : : {
1337 [ - + ]: 1161 : Assert(session_replication_state != NULL);
97 msawada@postgresql.o 1338 [ - + ]:GNC 1161 : Assert(session_replication_state->roident != InvalidReplOriginId);
1339 : :
4024 andres@anarazel.de 1340 :CBC 1161 : LWLockAcquire(&session_replication_state->lock, LW_EXCLUSIVE);
1341 [ + - ]: 1161 : if (session_replication_state->local_lsn < local_commit)
1342 : 1161 : session_replication_state->local_lsn = local_commit;
1343 [ + + ]: 1161 : if (session_replication_state->remote_lsn < remote_commit)
1344 : 544 : session_replication_state->remote_lsn = remote_commit;
1345 : 1161 : LWLockRelease(&session_replication_state->lock);
1346 : 1161 : }
1347 : :
1348 : : /*
1349 : : * Ask the machinery about the point up to which we successfully replayed
1350 : : * changes from an already setup replication origin.
1351 : : */
1352 : : XLogRecPtr
1353 : 286 : replorigin_session_get_progress(bool flush)
1354 : : {
1355 : : XLogRecPtr remote_lsn;
1356 : : XLogRecPtr local_lsn;
1357 : :
1358 [ - + ]: 286 : Assert(session_replication_state != NULL);
1359 : :
1360 : 286 : LWLockAcquire(&session_replication_state->lock, LW_SHARED);
1361 : 286 : remote_lsn = session_replication_state->remote_lsn;
1362 : 286 : local_lsn = session_replication_state->local_lsn;
1363 : 286 : LWLockRelease(&session_replication_state->lock);
1364 : :
180 alvherre@kurilemu.de 1365 [ + + + - ]:GNC 286 : if (flush && XLogRecPtrIsValid(local_lsn))
4024 andres@anarazel.de 1366 :CBC 1 : XLogFlush(local_lsn);
1367 : :
1368 : 286 : return remote_lsn;
1369 : : }
1370 : :
1371 : : /*
1372 : : * Clear the per-transaction replication origin state.
1373 : : *
1374 : : * replorigin_xact_state.origin is also cleared if clear_origin is set.
1375 : : */
1376 : : void
97 msawada@postgresql.o 1377 :GNC 818 : replorigin_xact_clear(bool clear_origin)
1378 : : {
1379 : 818 : replorigin_xact_state.origin_lsn = InvalidXLogRecPtr;
1380 : 818 : replorigin_xact_state.origin_timestamp = 0;
1381 [ + - ]: 818 : if (clear_origin)
1382 : 818 : replorigin_xact_state.origin = InvalidReplOriginId;
1383 : 818 : }
1384 : :
1385 : :
1386 : : /* ---------------------------------------------------------------------------
1387 : : * SQL functions for working with replication origin.
1388 : : *
1389 : : * These mostly should be fairly short wrappers around more generic functions.
1390 : : * ---------------------------------------------------------------------------
1391 : : */
1392 : :
1393 : : /*
1394 : : * Create replication origin for the passed in name, and return the assigned
1395 : : * oid.
1396 : : */
1397 : : Datum
4024 andres@anarazel.de 1398 :CBC 15 : pg_replication_origin_create(PG_FUNCTION_ARGS)
1399 : : {
1400 : : char *name;
1401 : : ReplOriginId roident;
1402 : :
1403 : 15 : replorigin_check_prerequisites(false, false);
1404 : :
1405 : 15 : name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1406 : :
1407 : : /*
1408 : : * Replication origins "any and "none" are reserved for system options.
1409 : : * The origins "pg_xxx" are reserved for internal use.
1410 : : */
1384 akapila@postgresql.o 1411 [ + + + + ]: 15 : if (IsReservedName(name) || IsReservedOriginName(name))
2502 tgl@sss.pgh.pa.us 1412 [ + - ]: 3 : ereport(ERROR,
1413 : : (errcode(ERRCODE_RESERVED_NAME),
1414 : : errmsg("replication origin name \"%s\" is reserved",
1415 : : name),
1416 : : errdetail("Origin names \"%s\", \"%s\", and names starting with \"pg_\" are reserved.",
1417 : : LOGICALREP_ORIGIN_ANY, LOGICALREP_ORIGIN_NONE)));
1418 : :
1419 : : /*
1420 : : * If built with appropriate switch, whine when regression-testing
1421 : : * conventions for replication origin names are violated.
1422 : : */
1423 : : #ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
1424 : : if (strncmp(name, "regress_", 8) != 0)
1425 : : elog(WARNING, "replication origins created by regression test cases should have names starting with \"regress_\"");
1426 : : #endif
1427 : :
4024 andres@anarazel.de 1428 : 12 : roident = replorigin_create(name);
1429 : :
1430 : 7 : pfree(name);
1431 : :
1432 : 7 : PG_RETURN_OID(roident);
1433 : : }
1434 : :
1435 : : /*
1436 : : * Drop replication origin.
1437 : : */
1438 : : Datum
1439 : 9 : pg_replication_origin_drop(PG_FUNCTION_ARGS)
1440 : : {
1441 : : char *name;
1442 : :
1443 : 9 : replorigin_check_prerequisites(false, false);
1444 : :
1445 : 9 : name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1446 : :
1910 akapila@postgresql.o 1447 : 9 : replorigin_drop_by_name(name, false, true);
1448 : :
4024 andres@anarazel.de 1449 : 8 : pfree(name);
1450 : :
1451 : 8 : PG_RETURN_VOID();
1452 : : }
1453 : :
1454 : : /*
1455 : : * Return oid of a replication origin.
1456 : : */
1457 : : Datum
4024 andres@anarazel.de 1458 :UBC 0 : pg_replication_origin_oid(PG_FUNCTION_ARGS)
1459 : : {
1460 : : char *name;
1461 : : ReplOriginId roident;
1462 : :
1463 : 0 : replorigin_check_prerequisites(false, false);
1464 : :
1465 : 0 : name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1466 : 0 : roident = replorigin_by_name(name, true);
1467 : :
1468 : 0 : pfree(name);
1469 : :
1470 [ # # ]: 0 : if (OidIsValid(roident))
1471 : 0 : PG_RETURN_OID(roident);
1472 : 0 : PG_RETURN_NULL();
1473 : : }
1474 : :
1475 : : /*
1476 : : * Setup a replication origin for this session.
1477 : : */
1478 : : Datum
4024 andres@anarazel.de 1479 :CBC 11 : pg_replication_origin_session_setup(PG_FUNCTION_ARGS)
1480 : : {
1481 : : char *name;
1482 : : ReplOriginId origin;
1483 : : int pid;
1484 : :
1485 : 11 : replorigin_check_prerequisites(true, false);
1486 : :
1487 : 11 : name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1488 : 11 : origin = replorigin_by_name(name, false);
228 akapila@postgresql.o 1489 :GNC 10 : pid = PG_GETARG_INT32(1);
1490 : 10 : replorigin_session_setup(origin, pid);
1491 : :
97 msawada@postgresql.o 1492 : 8 : replorigin_xact_state.origin = origin;
1493 : :
4024 andres@anarazel.de 1494 :CBC 8 : pfree(name);
1495 : :
1496 : 8 : PG_RETURN_VOID();
1497 : : }
1498 : :
1499 : : /*
1500 : : * Reset previously setup origin in this session
1501 : : */
1502 : : Datum
1503 : 10 : pg_replication_origin_session_reset(PG_FUNCTION_ARGS)
1504 : : {
1505 : 10 : replorigin_check_prerequisites(true, false);
1506 : :
1507 : 10 : replorigin_session_reset();
1508 : :
97 msawada@postgresql.o 1509 :GNC 8 : replorigin_xact_clear(true);
1510 : :
4024 andres@anarazel.de 1511 :CBC 8 : PG_RETURN_VOID();
1512 : : }
1513 : :
1514 : : /*
1515 : : * Has a replication origin been setup for this session.
1516 : : */
1517 : : Datum
4024 andres@anarazel.de 1518 :GBC 4 : pg_replication_origin_session_is_setup(PG_FUNCTION_ARGS)
1519 : : {
1520 : 4 : replorigin_check_prerequisites(false, false);
1521 : :
97 msawada@postgresql.o 1522 :GNC 4 : PG_RETURN_BOOL(replorigin_xact_state.origin != InvalidReplOriginId);
1523 : : }
1524 : :
1525 : :
1526 : : /*
1527 : : * Return the replication progress for origin setup in the current session.
1528 : : *
1529 : : * If 'flush' is set to true it is ensured that the returned value corresponds
1530 : : * to a local transaction that has been flushed. This is useful if asynchronous
1531 : : * commits are used when replaying replicated transactions.
1532 : : */
1533 : : Datum
4024 andres@anarazel.de 1534 :CBC 2 : pg_replication_origin_session_progress(PG_FUNCTION_ARGS)
1535 : : {
1536 : 2 : XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1537 : 2 : bool flush = PG_GETARG_BOOL(0);
1538 : :
1539 : 2 : replorigin_check_prerequisites(true, false);
1540 : :
1541 [ - + ]: 2 : if (session_replication_state == NULL)
4024 andres@anarazel.de 1542 [ # # ]:UBC 0 : ereport(ERROR,
1543 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1544 : : errmsg("no replication origin is configured")));
1545 : :
4024 andres@anarazel.de 1546 :CBC 2 : remote_lsn = replorigin_session_get_progress(flush);
1547 : :
180 alvherre@kurilemu.de 1548 [ - + ]:GNC 2 : if (!XLogRecPtrIsValid(remote_lsn))
4024 andres@anarazel.de 1549 :UBC 0 : PG_RETURN_NULL();
1550 : :
4024 andres@anarazel.de 1551 :CBC 2 : PG_RETURN_LSN(remote_lsn);
1552 : : }
1553 : :
1554 : : Datum
1555 : 1 : pg_replication_origin_xact_setup(PG_FUNCTION_ARGS)
1556 : : {
1557 : 1 : XLogRecPtr location = PG_GETARG_LSN(0);
1558 : :
1559 : 1 : replorigin_check_prerequisites(true, false);
1560 : :
1561 [ - + ]: 1 : if (session_replication_state == NULL)
4024 andres@anarazel.de 1562 [ # # ]:UBC 0 : ereport(ERROR,
1563 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1564 : : errmsg("no replication origin is configured")));
1565 : :
97 msawada@postgresql.o 1566 :GNC 1 : replorigin_xact_state.origin_lsn = location;
1567 : 1 : replorigin_xact_state.origin_timestamp = PG_GETARG_TIMESTAMPTZ(1);
1568 : :
4024 andres@anarazel.de 1569 :CBC 1 : PG_RETURN_VOID();
1570 : : }
1571 : :
1572 : : Datum
4024 andres@anarazel.de 1573 :UBC 0 : pg_replication_origin_xact_reset(PG_FUNCTION_ARGS)
1574 : : {
1575 : 0 : replorigin_check_prerequisites(true, false);
1576 : :
1577 : : /* Do not clear the session origin */
97 msawada@postgresql.o 1578 :UNC 0 : replorigin_xact_clear(false);
1579 : :
4024 andres@anarazel.de 1580 :UBC 0 : PG_RETURN_VOID();
1581 : : }
1582 : :
1583 : :
1584 : : Datum
4024 andres@anarazel.de 1585 :CBC 3 : pg_replication_origin_advance(PG_FUNCTION_ARGS)
1586 : : {
3341 noah@leadboat.com 1587 : 3 : text *name = PG_GETARG_TEXT_PP(0);
4000 bruce@momjian.us 1588 : 3 : XLogRecPtr remote_commit = PG_GETARG_LSN(1);
1589 : : ReplOriginId node;
1590 : :
4024 andres@anarazel.de 1591 : 3 : replorigin_check_prerequisites(true, false);
1592 : :
1593 : : /* lock to prevent the replication origin from vanishing */
1594 : 3 : LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1595 : :
1596 : 3 : node = replorigin_by_name(text_to_cstring(name), false);
1597 : :
1598 : : /*
1599 : : * Can't sensibly pass a local commit to be flushed at checkpoint - this
1600 : : * xact hasn't committed yet. This is why this function should be used to
1601 : : * set up the initial replication state, but not for replay.
1602 : : */
1603 : 2 : replorigin_advance(node, remote_commit, InvalidXLogRecPtr,
1604 : : true /* go backward */ , true /* WAL log */ );
1605 : :
1606 : 2 : UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1607 : :
1608 : 2 : PG_RETURN_VOID();
1609 : : }
1610 : :
1611 : :
1612 : : /*
1613 : : * Return the replication progress for an individual replication origin.
1614 : : *
1615 : : * If 'flush' is set to true it is ensured that the returned value corresponds
1616 : : * to a local transaction that has been flushed. This is useful if asynchronous
1617 : : * commits are used when replaying replicated transactions.
1618 : : */
1619 : : Datum
1620 : 3 : pg_replication_origin_progress(PG_FUNCTION_ARGS)
1621 : : {
1622 : : char *name;
1623 : : bool flush;
1624 : : ReplOriginId roident;
1625 : 3 : XLogRecPtr remote_lsn = InvalidXLogRecPtr;
1626 : :
1627 : 3 : replorigin_check_prerequisites(true, true);
1628 : :
1629 : 3 : name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1630 : 3 : flush = PG_GETARG_BOOL(1);
1631 : :
1632 : 3 : roident = replorigin_by_name(name, false);
1633 [ - + ]: 2 : Assert(OidIsValid(roident));
1634 : :
1635 : 2 : remote_lsn = replorigin_get_progress(roident, flush);
1636 : :
180 alvherre@kurilemu.de 1637 [ - + ]:GNC 2 : if (!XLogRecPtrIsValid(remote_lsn))
4024 andres@anarazel.de 1638 :UBC 0 : PG_RETURN_NULL();
1639 : :
4024 andres@anarazel.de 1640 :CBC 2 : PG_RETURN_LSN(remote_lsn);
1641 : : }
1642 : :
1643 : :
1644 : : Datum
1645 : 11 : pg_show_replication_origin_status(PG_FUNCTION_ARGS)
1646 : : {
1647 : 11 : ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1648 : : int i;
1649 : : #define REPLICATION_ORIGIN_PROGRESS_COLS 4
1650 : :
1651 : : /* we want to return 0 rows if slot is set to zero */
1652 : 11 : replorigin_check_prerequisites(false, true);
1653 : :
1295 michael@paquier.xyz 1654 : 11 : InitMaterializedSRF(fcinfo, 0);
1655 : :
1656 : : /* prevent slots from being concurrently dropped */
4024 andres@anarazel.de 1657 : 11 : LWLockAcquire(ReplicationOriginLock, LW_SHARED);
1658 : :
1659 : : /*
1660 : : * Iterate through all possible replication_states, display if they are
1661 : : * filled. Note that we do not take any locks, so slightly corrupted/out
1662 : : * of date values are a possibility.
1663 : : */
410 msawada@postgresql.o 1664 [ + + ]: 121 : for (i = 0; i < max_active_replication_origins; i++)
1665 : : {
1666 : : ReplicationState *state;
1667 : : Datum values[REPLICATION_ORIGIN_PROGRESS_COLS];
1668 : : bool nulls[REPLICATION_ORIGIN_PROGRESS_COLS];
1669 : : char *roname;
1670 : :
4024 andres@anarazel.de 1671 : 110 : state = &replication_states[i];
1672 : :
1673 : : /* unused slot, nothing to display */
97 msawada@postgresql.o 1674 [ + + ]:GNC 110 : if (state->roident == InvalidReplOriginId)
4024 andres@anarazel.de 1675 :CBC 97 : continue;
1676 : :
1677 : 13 : memset(values, 0, sizeof(values));
1678 : 13 : memset(nulls, 1, sizeof(nulls));
1679 : :
1680 : 13 : values[0] = ObjectIdGetDatum(state->roident);
1681 : 13 : nulls[0] = false;
1682 : :
1683 : : /*
1684 : : * We're not preventing the origin to be dropped concurrently, so
1685 : : * silently accept that it might be gone.
1686 : : */
1687 [ + - ]: 13 : if (replorigin_by_oid(state->roident, true,
1688 : : &roname))
1689 : : {
1690 : 13 : values[1] = CStringGetTextDatum(roname);
1691 : 13 : nulls[1] = false;
1692 : : }
1693 : :
1694 : 13 : LWLockAcquire(&state->lock, LW_SHARED);
1695 : :
4000 bruce@momjian.us 1696 : 13 : values[2] = LSNGetDatum(state->remote_lsn);
4024 andres@anarazel.de 1697 : 13 : nulls[2] = false;
1698 : :
1699 : 13 : values[3] = LSNGetDatum(state->local_lsn);
1700 : 13 : nulls[3] = false;
1701 : :
1702 : 13 : LWLockRelease(&state->lock);
1703 : :
1520 michael@paquier.xyz 1704 : 13 : tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
1705 : : values, nulls);
1706 : : }
1707 : :
4024 andres@anarazel.de 1708 : 11 : LWLockRelease(ReplicationOriginLock);
1709 : :
1710 : : #undef REPLICATION_ORIGIN_PROGRESS_COLS
1711 : :
1712 : 11 : return (Datum) 0;
1713 : : }
|