Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * slot.c
4 : : * Replication slot management.
5 : : *
6 : : *
7 : : * Copyright (c) 2012-2025, PostgreSQL Global Development Group
8 : : *
9 : : *
10 : : * IDENTIFICATION
11 : : * src/backend/replication/slot.c
12 : : *
13 : : * NOTES
14 : : *
15 : : * Replication slots are used to keep state about replication streams
16 : : * originating from this cluster. Their primary purpose is to prevent the
17 : : * premature removal of WAL or of old tuple versions in a manner that would
18 : : * interfere with replication; they are also useful for monitoring purposes.
19 : : * Slots need to be permanent (to allow restarts), crash-safe, and allocatable
20 : : * on standbys (to support cascading setups). The requirement that slots be
21 : : * usable on standbys precludes storing them in the system catalogs.
22 : : *
23 : : * Each replication slot gets its own directory inside the directory
24 : : * $PGDATA / PG_REPLSLOT_DIR. Inside that directory the state file will
25 : : * contain the slot's own data. Additional data can be stored alongside that
26 : : * file if required. While the server is running, the state data is also
27 : : * cached in memory for efficiency.
28 : : *
29 : : * ReplicationSlotAllocationLock must be taken in exclusive mode to allocate
30 : : * or free a slot. ReplicationSlotControlLock must be taken in shared mode
31 : : * to iterate over the slots, and in exclusive mode to change the in_use flag
32 : : * of a slot. The remaining data in each slot is protected by its mutex.
33 : : *
34 : : *-------------------------------------------------------------------------
35 : : */
36 : :
37 : : #include "postgres.h"
38 : :
39 : : #include <unistd.h>
40 : : #include <sys/stat.h>
41 : :
42 : : #include "access/transam.h"
43 : : #include "access/xlog_internal.h"
44 : : #include "access/xlogrecovery.h"
45 : : #include "common/file_utils.h"
46 : : #include "common/string.h"
47 : : #include "miscadmin.h"
48 : : #include "pgstat.h"
49 : : #include "postmaster/interrupt.h"
50 : : #include "replication/logicallauncher.h"
51 : : #include "replication/slotsync.h"
52 : : #include "replication/slot.h"
53 : : #include "replication/walsender_private.h"
54 : : #include "storage/fd.h"
55 : : #include "storage/ipc.h"
56 : : #include "storage/proc.h"
57 : : #include "storage/procarray.h"
58 : : #include "utils/builtins.h"
59 : : #include "utils/guc_hooks.h"
60 : : #include "utils/injection_point.h"
61 : : #include "utils/varlena.h"
62 : :
63 : : /*
64 : : * Replication slot on-disk data structure.
65 : : */
66 : : typedef struct ReplicationSlotOnDisk
67 : : {
68 : : /* first part of this struct needs to be version independent */
69 : :
70 : : /* data not covered by checksum */
71 : : uint32 magic;
72 : : pg_crc32c checksum;
73 : :
74 : : /* data covered by checksum */
75 : : uint32 version;
76 : : uint32 length;
77 : :
78 : : /*
79 : : * The actual data in the slot that follows can differ based on the above
80 : : * 'version'.
81 : : */
82 : :
83 : : ReplicationSlotPersistentData slotdata;
84 : : } ReplicationSlotOnDisk;
85 : :
86 : : /*
87 : : * Struct for the configuration of synchronized_standby_slots.
88 : : *
89 : : * Note: this must be a flat representation that can be held in a single chunk
90 : : * of guc_malloc'd memory, so that it can be stored as the "extra" data for the
91 : : * synchronized_standby_slots GUC.
92 : : */
93 : : typedef struct
94 : : {
95 : : /* Number of slot names in the slot_names[] */
96 : : int nslotnames;
97 : :
98 : : /*
99 : : * slot_names contains 'nslotnames' consecutive null-terminated C strings.
100 : : */
101 : : char slot_names[FLEXIBLE_ARRAY_MEMBER];
102 : : } SyncStandbySlotsConfigData;
103 : :
104 : : /*
105 : : * Lookup table for slot invalidation causes.
106 : : */
107 : : typedef struct SlotInvalidationCauseMap
108 : : {
109 : : ReplicationSlotInvalidationCause cause;
110 : : const char *cause_name;
111 : : } SlotInvalidationCauseMap;
112 : :
113 : : static const SlotInvalidationCauseMap SlotInvalidationCauses[] = {
114 : : {RS_INVAL_NONE, "none"},
115 : : {RS_INVAL_WAL_REMOVED, "wal_removed"},
116 : : {RS_INVAL_HORIZON, "rows_removed"},
117 : : {RS_INVAL_WAL_LEVEL, "wal_level_insufficient"},
118 : : {RS_INVAL_IDLE_TIMEOUT, "idle_timeout"},
119 : : };
120 : :
121 : : /*
122 : : * Ensure that the lookup table is up-to-date with the enums defined in
123 : : * ReplicationSlotInvalidationCause.
124 : : */
125 : : StaticAssertDecl(lengthof(SlotInvalidationCauses) == (RS_INVAL_MAX_CAUSES + 1),
126 : : "array length mismatch");
127 : :
128 : : /* size of version independent data */
129 : : #define ReplicationSlotOnDiskConstantSize \
130 : : offsetof(ReplicationSlotOnDisk, slotdata)
131 : : /* size of the part of the slot not covered by the checksum */
132 : : #define ReplicationSlotOnDiskNotChecksummedSize \
133 : : offsetof(ReplicationSlotOnDisk, version)
134 : : /* size of the part covered by the checksum */
135 : : #define ReplicationSlotOnDiskChecksummedSize \
136 : : sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskNotChecksummedSize
137 : : /* size of the slot data that is version dependent */
138 : : #define ReplicationSlotOnDiskV2Size \
139 : : sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize
140 : :
141 : : #define SLOT_MAGIC 0x1051CA1 /* format identifier */
142 : : #define SLOT_VERSION 5 /* version for new files */
143 : :
144 : : /* Control array for replication slot management */
145 : : ReplicationSlotCtlData *ReplicationSlotCtl = NULL;
146 : :
147 : : /* My backend's replication slot in the shared memory array */
148 : : ReplicationSlot *MyReplicationSlot = NULL;
149 : :
150 : : /* GUC variables */
151 : : int max_replication_slots = 10; /* the maximum number of replication
152 : : * slots */
153 : :
154 : : /*
155 : : * Invalidate replication slots that have remained idle longer than this
156 : : * duration; '0' disables it.
157 : : */
158 : : int idle_replication_slot_timeout_secs = 0;
159 : :
160 : : /*
161 : : * This GUC lists streaming replication standby server slot names that
162 : : * logical WAL sender processes will wait for.
163 : : */
164 : : char *synchronized_standby_slots;
165 : :
166 : : /* This is the parsed and cached configuration for synchronized_standby_slots */
167 : : static SyncStandbySlotsConfigData *synchronized_standby_slots_config;
168 : :
169 : : /*
170 : : * Oldest LSN that has been confirmed to be flushed to the standbys
171 : : * corresponding to the physical slots specified in the synchronized_standby_slots GUC.
172 : : */
173 : : static XLogRecPtr ss_oldest_flush_lsn = InvalidXLogRecPtr;
174 : :
175 : : static void ReplicationSlotShmemExit(int code, Datum arg);
176 : : static bool IsSlotForConflictCheck(const char *name);
177 : : static void ReplicationSlotDropPtr(ReplicationSlot *slot);
178 : :
179 : : /* internal persistency functions */
180 : : static void RestoreSlotFromDisk(const char *name);
181 : : static void CreateSlotOnDisk(ReplicationSlot *slot);
182 : : static void SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel);
183 : :
184 : : /*
185 : : * Report shared-memory space needed by ReplicationSlotsShmemInit.
186 : : */
187 : : Size
4236 rhaas@postgresql.org 188 :CBC 3965 : ReplicationSlotsShmemSize(void)
189 : : {
190 : 3965 : Size size = 0;
191 : :
192 [ + + ]: 3965 : if (max_replication_slots == 0)
4236 rhaas@postgresql.org 193 :GBC 2 : return size;
194 : :
4236 rhaas@postgresql.org 195 :CBC 3963 : size = offsetof(ReplicationSlotCtlData, replication_slots);
196 : 3963 : size = add_size(size,
197 : : mul_size(max_replication_slots, sizeof(ReplicationSlot)));
198 : :
199 : 3963 : return size;
200 : : }
201 : :
202 : : /*
203 : : * Allocate and initialize shared memory for replication slots.
204 : : */
205 : : void
206 : 1029 : ReplicationSlotsShmemInit(void)
207 : : {
208 : : bool found;
209 : :
210 [ + + ]: 1029 : if (max_replication_slots == 0)
4236 rhaas@postgresql.org 211 :GBC 1 : return;
212 : :
4236 rhaas@postgresql.org 213 :CBC 1028 : ReplicationSlotCtl = (ReplicationSlotCtlData *)
214 : 1028 : ShmemInitStruct("ReplicationSlot Ctl", ReplicationSlotsShmemSize(),
215 : : &found);
216 : :
217 [ + - ]: 1028 : if (!found)
218 : : {
219 : : int i;
220 : :
221 : : /* First time through, so initialize */
222 [ + - + - : 1928 : MemSet(ReplicationSlotCtl, 0, ReplicationSlotsShmemSize());
+ - + + +
+ ]
223 : :
224 [ + + ]: 11159 : for (i = 0; i < max_replication_slots; i++)
225 : : {
226 : 10131 : ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[i];
227 : :
228 : : /* everything else is zeroed by the memset above */
229 : 10131 : SpinLockInit(&slot->mutex);
1940 tgl@sss.pgh.pa.us 230 : 10131 : LWLockInitialize(&slot->io_in_progress_lock,
231 : : LWTRANCHE_REPLICATION_SLOT_IO);
2965 alvherre@alvh.no-ip. 232 : 10131 : ConditionVariableInit(&slot->active_cv);
233 : : }
234 : : }
235 : : }
236 : :
237 : : /*
238 : : * Register the callback for replication slot cleanup and releasing.
239 : : */
240 : : void
1300 andres@anarazel.de 241 : 18749 : ReplicationSlotInitialize(void)
242 : : {
243 : 18749 : before_shmem_exit(ReplicationSlotShmemExit, 0);
244 : 18749 : }
245 : :
246 : : /*
247 : : * Release and cleanup replication slots.
248 : : */
249 : : static void
250 : 18749 : ReplicationSlotShmemExit(int code, Datum arg)
251 : : {
252 : : /* Make sure active replication slots are released */
253 [ + + ]: 18749 : if (MyReplicationSlot != NULL)
254 : 247 : ReplicationSlotRelease();
255 : :
256 : : /* Also cleanup all the temporary slots. */
499 akapila@postgresql.o 257 : 18749 : ReplicationSlotCleanup(false);
1300 andres@anarazel.de 258 : 18749 : }
259 : :
260 : : /*
261 : : * Check whether the passed slot name is valid and report errors at elevel.
262 : : *
263 : : * An error will be reported for a reserved replication slot name if
264 : : * allow_reserved_name is set to false.
265 : : *
266 : : * Slot names may consist out of [a-z0-9_]{1,NAMEDATALEN-1} which should allow
267 : : * the name to be used as a directory name on every supported OS.
268 : : *
269 : : * Returns whether the directory name is valid or not if elevel < ERROR.
270 : : */
271 : : bool
45 akapila@postgresql.o 272 :GNC 860 : ReplicationSlotValidateName(const char *name, bool allow_reserved_name,
273 : : int elevel)
274 : : {
275 : : const char *cp;
276 : :
4236 rhaas@postgresql.org 277 [ + + ]:CBC 860 : if (strlen(name) == 0)
278 : : {
279 [ + - ]: 3 : ereport(elevel,
280 : : (errcode(ERRCODE_INVALID_NAME),
281 : : errmsg("replication slot name \"%s\" is too short",
282 : : name)));
4236 rhaas@postgresql.org 283 :UBC 0 : return false;
284 : : }
285 : :
4236 rhaas@postgresql.org 286 [ - + ]:CBC 857 : if (strlen(name) >= NAMEDATALEN)
287 : : {
4236 rhaas@postgresql.org 288 [ # # ]:UBC 0 : ereport(elevel,
289 : : (errcode(ERRCODE_NAME_TOO_LONG),
290 : : errmsg("replication slot name \"%s\" is too long",
291 : : name)));
292 : 0 : return false;
293 : : }
294 : :
4236 rhaas@postgresql.org 295 [ + + ]:CBC 16789 : for (cp = name; *cp; cp++)
296 : : {
297 [ + + - + ]: 15933 : if (!((*cp >= 'a' && *cp <= 'z')
298 [ + - + + ]: 8243 : || (*cp >= '0' && *cp <= '9')
299 [ + + ]: 1606 : || (*cp == '_')))
300 : : {
301 [ + - ]: 1 : ereport(elevel,
302 : : (errcode(ERRCODE_INVALID_NAME),
303 : : errmsg("replication slot name \"%s\" contains invalid character",
304 : : name),
305 : : errhint("Replication slot names may only contain lower case letters, numbers, and the underscore character.")));
4236 rhaas@postgresql.org 306 :UBC 0 : return false;
307 : : }
308 : : }
309 : :
45 akapila@postgresql.o 310 [ + + - + ]:GNC 856 : if (!allow_reserved_name && IsSlotForConflictCheck(name))
311 : : {
45 akapila@postgresql.o 312 [ # # ]:UNC 0 : ereport(elevel,
313 : : errcode(ERRCODE_RESERVED_NAME),
314 : : errmsg("replication slot name \"%s\" is reserved",
315 : : name),
316 : : errdetail("The name \"%s\" is reserved for the conflict detection slot.",
317 : : CONFLICT_DETECTION_SLOT));
318 : :
319 : 0 : return false;
320 : : }
321 : :
4236 rhaas@postgresql.org 322 :CBC 856 : return true;
323 : : }
324 : :
325 : : /*
326 : : * Return true if the replication slot name is "pg_conflict_detection".
327 : : */
328 : : static bool
45 akapila@postgresql.o 329 :GNC 1969 : IsSlotForConflictCheck(const char *name)
330 : : {
331 : 1969 : return (strcmp(name, CONFLICT_DETECTION_SLOT) == 0);
332 : : }
333 : :
334 : : /*
335 : : * Create a new replication slot and mark it as used by this backend.
336 : : *
337 : : * name: Name of the slot
338 : : * db_specific: logical decoding is db specific; if the slot is going to
339 : : * be used for that pass true, otherwise false.
340 : : * two_phase: Allows decoding of prepared transactions. We allow this option
341 : : * to be enabled only at the slot creation time. If we allow this option
342 : : * to be changed during decoding then it is quite possible that we skip
343 : : * prepare first time because this option was not enabled. Now next time
344 : : * during getting changes, if the two_phase option is enabled it can skip
345 : : * prepare because by that time start decoding point has been moved. So the
346 : : * user will only get commit prepared.
347 : : * failover: If enabled, allows the slot to be synced to standbys so
348 : : * that logical replication can be resumed after failover.
349 : : * synced: True if the slot is synchronized from the primary server.
350 : : */
351 : : void
4205 rhaas@postgresql.org 352 :CBC 617 : ReplicationSlotCreate(const char *name, bool db_specific,
353 : : ReplicationSlotPersistency persistency,
354 : : bool two_phase, bool failover, bool synced)
355 : : {
4236 356 : 617 : ReplicationSlot *slot = NULL;
357 : : int i;
358 : :
359 [ - + ]: 617 : Assert(MyReplicationSlot == NULL);
360 : :
361 : : /*
362 : : * The logical launcher or pg_upgrade may create or migrate an internal
363 : : * slot, so using a reserved name is allowed in these cases.
364 : : */
45 akapila@postgresql.o 365 [ + + + + ]:GNC 617 : ReplicationSlotValidateName(name, IsBinaryUpgrade || IsLogicalLauncher(),
366 : : ERROR);
367 : :
570 akapila@postgresql.o 368 [ + + ]:CBC 616 : if (failover)
369 : : {
370 : : /*
371 : : * Do not allow users to create the failover enabled slots on the
372 : : * standby as we do not support sync to the cascading standby.
373 : : *
374 : : * However, failover enabled slots can be created during slot
375 : : * synchronization because we need to retain the same values as the
376 : : * remote slot.
377 : : */
378 [ + + - + ]: 22 : if (RecoveryInProgress() && !IsSyncingReplicationSlots())
570 akapila@postgresql.o 379 [ # # ]:UBC 0 : ereport(ERROR,
380 : : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
381 : : errmsg("cannot enable failover for a replication slot created on the standby"));
382 : :
383 : : /*
384 : : * Do not allow users to create failover enabled temporary slots,
385 : : * because temporary slots will not be synced to the standby.
386 : : *
387 : : * However, failover enabled temporary slots can be created during
388 : : * slot synchronization. See the comments atop slotsync.c for details.
389 : : */
570 akapila@postgresql.o 390 [ + + + + ]:CBC 22 : if (persistency == RS_TEMPORARY && !IsSyncingReplicationSlots())
391 [ + - ]: 1 : ereport(ERROR,
392 : : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
393 : : errmsg("cannot enable failover for a temporary replication slot"));
394 : : }
395 : :
396 : : /*
397 : : * If some other backend ran this code concurrently with us, we'd likely
398 : : * both allocate the same slot, and that would be bad. We'd also be at
399 : : * risk of missing a name collision. Also, we don't want to try to create
400 : : * a new slot while somebody's busy cleaning up an old one, because we
401 : : * might both be monkeying with the same directory.
402 : : */
4236 rhaas@postgresql.org 403 : 615 : LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
404 : :
405 : : /*
406 : : * Check for name collision, and identify an allocatable slot. We need to
407 : : * hold ReplicationSlotControlLock in shared mode for this, so that nobody
408 : : * else can change the in_use flags while we're looking at them.
409 : : */
410 : 615 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
411 [ + + ]: 5999 : for (i = 0; i < max_replication_slots; i++)
412 : : {
413 : 5387 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
414 : :
415 [ + + + + ]: 5387 : if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
416 [ + - ]: 3 : ereport(ERROR,
417 : : (errcode(ERRCODE_DUPLICATE_OBJECT),
418 : : errmsg("replication slot \"%s\" already exists", name)));
419 [ + + + + ]: 5384 : if (!s->in_use && slot == NULL)
420 : 611 : slot = s;
421 : : }
422 : 612 : LWLockRelease(ReplicationSlotControlLock);
423 : :
424 : : /* If all slots are in use, we're out of luck. */
425 [ + + ]: 612 : if (slot == NULL)
426 [ + - ]: 1 : ereport(ERROR,
427 : : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
428 : : errmsg("all replication slots are in use"),
429 : : errhint("Free one or increase \"max_replication_slots\".")));
430 : :
431 : : /*
432 : : * Since this slot is not in use, nobody should be looking at any part of
433 : : * it other than the in_use field unless they're trying to allocate it.
434 : : * And since we hold ReplicationSlotAllocationLock, nobody except us can
435 : : * be doing that. So it's safe to initialize the slot.
436 : : */
437 [ - + ]: 611 : Assert(!slot->in_use);
3791 andres@anarazel.de 438 [ - + ]: 611 : Assert(slot->active_pid == 0);
439 : :
440 : : /* first initialize persistent data */
3307 441 : 611 : memset(&slot->data, 0, sizeof(ReplicationSlotPersistentData));
1853 peter@eisentraut.org 442 : 611 : namestrcpy(&slot->data.name, name);
4236 rhaas@postgresql.org 443 [ + + ]: 611 : slot->data.database = db_specific ? MyDatabaseId : InvalidOid;
3307 andres@anarazel.de 444 : 611 : slot->data.persistency = persistency;
1648 akapila@postgresql.o 445 : 611 : slot->data.two_phase = two_phase;
1515 446 : 611 : slot->data.two_phase_at = InvalidXLogRecPtr;
590 447 : 611 : slot->data.failover = failover;
570 448 : 611 : slot->data.synced = synced;
449 : :
450 : : /* and then data only present in shared memory */
3307 andres@anarazel.de 451 : 611 : slot->just_dirtied = false;
452 : 611 : slot->dirty = false;
453 : 611 : slot->effective_xmin = InvalidTransactionId;
454 : 611 : slot->effective_catalog_xmin = InvalidTransactionId;
455 : 611 : slot->candidate_catalog_xmin = InvalidTransactionId;
456 : 611 : slot->candidate_xmin_lsn = InvalidXLogRecPtr;
457 : 611 : slot->candidate_restart_valid = InvalidXLogRecPtr;
458 : 611 : slot->candidate_restart_lsn = InvalidXLogRecPtr;
723 akapila@postgresql.o 459 : 611 : slot->last_saved_confirmed_flush = InvalidXLogRecPtr;
84 akorotkov@postgresql 460 : 611 : slot->last_saved_restart_lsn = InvalidXLogRecPtr;
528 akapila@postgresql.o 461 : 611 : slot->inactive_since = 0;
462 : :
463 : : /*
464 : : * Create the slot on disk. We haven't actually marked the slot allocated
465 : : * yet, so no special cleanup is required if this errors out.
466 : : */
4236 rhaas@postgresql.org 467 : 611 : CreateSlotOnDisk(slot);
468 : :
469 : : /*
470 : : * We need to briefly prevent any other backend from iterating over the
471 : : * slots while we flip the in_use flag. We also need to set the active
472 : : * flag while holding the ControlLock as otherwise a concurrent
473 : : * ReplicationSlotAcquire() could acquire the slot as well.
474 : : */
475 : 611 : LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
476 : :
477 : 611 : slot->in_use = true;
478 : :
479 : : /* We can now mark the slot active, and that makes it our slot. */
3623 480 [ - + ]: 611 : SpinLockAcquire(&slot->mutex);
481 [ - + ]: 611 : Assert(slot->active_pid == 0);
482 : 611 : slot->active_pid = MyProcPid;
483 : 611 : SpinLockRelease(&slot->mutex);
484 : 611 : MyReplicationSlot = slot;
485 : :
4236 486 : 611 : LWLockRelease(ReplicationSlotControlLock);
487 : :
488 : : /*
489 : : * Create statistics entry for the new logical slot. We don't collect any
490 : : * stats for physical slots, so no need to create an entry for the same.
491 : : * See ReplicationSlotDropPtr for why we need to do this before releasing
492 : : * ReplicationSlotAllocationLock.
493 : : */
1794 akapila@postgresql.o 494 [ + + ]: 611 : if (SlotIsLogical(slot))
1249 andres@anarazel.de 495 : 438 : pgstat_create_replslot(slot);
496 : :
497 : : /*
498 : : * Now that the slot has been marked as in_use and active, it's safe to
499 : : * let somebody else try to allocate a slot.
500 : : */
4236 rhaas@postgresql.org 501 : 611 : LWLockRelease(ReplicationSlotAllocationLock);
502 : :
503 : : /* Let everybody know we've modified this slot */
2965 alvherre@alvh.no-ip. 504 : 611 : ConditionVariableBroadcast(&slot->active_cv);
4236 rhaas@postgresql.org 505 : 611 : }
506 : :
507 : : /*
508 : : * Search for the named replication slot.
509 : : *
510 : : * Return the replication slot if found, otherwise NULL.
511 : : */
512 : : ReplicationSlot *
1593 akapila@postgresql.o 513 : 1790 : SearchNamedReplicationSlot(const char *name, bool need_lock)
514 : : {
515 : : int i;
516 : 1790 : ReplicationSlot *slot = NULL;
517 : :
518 [ + + ]: 1790 : if (need_lock)
519 : 493 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
520 : :
4236 rhaas@postgresql.org 521 [ + + ]: 6593 : for (i = 0; i < max_replication_slots; i++)
522 : : {
523 : 6184 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
524 : :
525 [ + + + + ]: 6184 : if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
526 : : {
527 : 1381 : slot = s;
528 : 1381 : break;
529 : : }
530 : : }
531 : :
1593 akapila@postgresql.o 532 [ + + ]: 1790 : if (need_lock)
533 : 493 : LWLockRelease(ReplicationSlotControlLock);
534 : :
1905 fujii@postgresql.org 535 : 1790 : return slot;
536 : : }
537 : :
538 : : /*
539 : : * Return the index of the replication slot in
540 : : * ReplicationSlotCtl->replication_slots.
541 : : *
542 : : * This is mainly useful to have an efficient key for storing replication slot
543 : : * stats.
544 : : */
545 : : int
1249 andres@anarazel.de 546 : 7085 : ReplicationSlotIndex(ReplicationSlot *slot)
547 : : {
548 [ + - - + ]: 7085 : Assert(slot >= ReplicationSlotCtl->replication_slots &&
549 : : slot < ReplicationSlotCtl->replication_slots + max_replication_slots);
550 : :
551 : 7085 : return slot - ReplicationSlotCtl->replication_slots;
552 : : }
553 : :
554 : : /*
555 : : * If the slot at 'index' is unused, return false. Otherwise 'name' is set to
556 : : * the slot's name and true is returned.
557 : : *
558 : : * This likely is only useful for pgstat_replslot.c during shutdown, in other
559 : : * cases there are obvious TOCTOU issues.
560 : : */
561 : : bool
1064 562 : 89 : ReplicationSlotName(int index, Name name)
563 : : {
564 : : ReplicationSlot *slot;
565 : : bool found;
566 : :
567 : 89 : slot = &ReplicationSlotCtl->replication_slots[index];
568 : :
569 : : /*
570 : : * Ensure that the slot cannot be dropped while we copy the name. Don't
571 : : * need the spinlock as the name of an existing slot cannot change.
572 : : */
573 : 89 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
574 : 89 : found = slot->in_use;
575 [ + - ]: 89 : if (slot->in_use)
576 : 89 : namestrcpy(name, NameStr(slot->data.name));
577 : 89 : LWLockRelease(ReplicationSlotControlLock);
578 : :
579 : 89 : return found;
580 : : }
581 : :
582 : : /*
583 : : * Find a previously created slot and mark it as used by this process.
584 : : *
585 : : * An error is raised if nowait is true and the slot is currently in use. If
586 : : * nowait is false, we sleep until the slot is released by the owning process.
587 : : *
588 : : * An error is raised if error_if_invalid is true and the slot is found to
589 : : * be invalid. It should always be set to true, except when we are temporarily
590 : : * acquiring the slot and don't intend to change it.
591 : : */
592 : : void
218 akapila@postgresql.o 593 : 1223 : ReplicationSlotAcquire(const char *name, bool nowait, bool error_if_invalid)
594 : : {
595 : : ReplicationSlot *s;
596 : : int active_pid;
597 : :
1044 peter@eisentraut.org 598 [ + - ]: 1223 : Assert(name != NULL);
599 : :
1905 fujii@postgresql.org 600 : 1223 : retry:
601 [ - + ]: 1223 : Assert(MyReplicationSlot == NULL);
602 : :
603 : 1223 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
604 : :
605 : : /* Check if the slot exits with the given name. */
1548 alvherre@alvh.no-ip. 606 : 1223 : s = SearchNamedReplicationSlot(name, false);
1905 fujii@postgresql.org 607 [ + + - + ]: 1223 : if (s == NULL || !s->in_use)
608 : : {
609 : 10 : LWLockRelease(ReplicationSlotControlLock);
610 : :
4236 rhaas@postgresql.org 611 [ + - ]: 10 : ereport(ERROR,
612 : : (errcode(ERRCODE_UNDEFINED_OBJECT),
613 : : errmsg("replication slot \"%s\" does not exist",
614 : : name)));
615 : : }
616 : :
617 : : /*
618 : : * Do not allow users to acquire the reserved slot. This scenario may
619 : : * occur if the launcher that owns the slot has terminated unexpectedly
620 : : * due to an error, and a backend process attempts to reuse the slot.
621 : : */
45 akapila@postgresql.o 622 [ + - - + ]:GNC 1213 : if (!IsLogicalLauncher() && IsSlotForConflictCheck(name))
45 akapila@postgresql.o 623 [ # # ]:UNC 0 : ereport(ERROR,
624 : : errcode(ERRCODE_UNDEFINED_OBJECT),
625 : : errmsg("cannot acquire replication slot \"%s\"", name),
626 : : errdetail("The slot is reserved for conflict detection and can only be acquired by logical replication launcher."));
627 : :
628 : : /*
629 : : * This is the slot we want; check if it's active under some other
630 : : * process. In single user mode, we don't need this check.
631 : : */
1905 fujii@postgresql.org 632 [ + + ]:CBC 1213 : if (IsUnderPostmaster)
633 : : {
634 : : /*
635 : : * Get ready to sleep on the slot in case it is active. (We may end
636 : : * up not sleeping, but we don't want to do this while holding the
637 : : * spinlock.)
638 : : */
1548 alvherre@alvh.no-ip. 639 [ + + ]: 1208 : if (!nowait)
1905 fujii@postgresql.org 640 : 266 : ConditionVariablePrepareToSleep(&s->active_cv);
641 : :
642 : : /*
643 : : * It is important to reset the inactive_since under spinlock here to
644 : : * avoid race conditions with slot invalidation. See comments related
645 : : * to inactive_since in InvalidatePossiblyObsoleteSlot.
646 : : */
647 [ - + ]: 1208 : SpinLockAcquire(&s->mutex);
648 [ + + ]: 1208 : if (s->active_pid == 0)
649 : 1073 : s->active_pid = MyProcPid;
650 : 1208 : active_pid = s->active_pid;
199 akapila@postgresql.o 651 : 1208 : ReplicationSlotSetInactiveSince(s, 0, false);
1905 fujii@postgresql.org 652 : 1208 : SpinLockRelease(&s->mutex);
653 : : }
654 : : else
655 : : {
17 michael@paquier.xyz 656 : 5 : s->active_pid = active_pid = MyProcPid;
199 akapila@postgresql.o 657 : 5 : ReplicationSlotSetInactiveSince(s, 0, true);
658 : : }
1905 fujii@postgresql.org 659 : 1213 : LWLockRelease(ReplicationSlotControlLock);
660 : :
661 : : /*
662 : : * If we found the slot but it's already active in another process, we
663 : : * wait until the owning process signals us that it's been released, or
664 : : * error out.
665 : : */
3194 peter_e@gmx.net 666 [ - + ]: 1213 : if (active_pid != MyProcPid)
667 : : {
1548 alvherre@alvh.no-ip. 668 [ # # ]:UBC 0 : if (!nowait)
669 : : {
670 : : /* Wait here until we get signaled, and then restart */
1545 671 : 0 : ConditionVariableSleep(&s->active_cv,
672 : : WAIT_EVENT_REPLICATION_SLOT_DROP);
673 : 0 : ConditionVariableCancelSleep();
674 : 0 : goto retry;
675 : : }
676 : :
677 [ # # ]: 0 : ereport(ERROR,
678 : : (errcode(ERRCODE_OBJECT_IN_USE),
679 : : errmsg("replication slot \"%s\" is active for PID %d",
680 : : NameStr(s->data.name), active_pid)));
681 : : }
1548 alvherre@alvh.no-ip. 682 [ + + ]:CBC 1213 : else if (!nowait)
1578 tgl@sss.pgh.pa.us 683 : 266 : ConditionVariableCancelSleep(); /* no sleep needed after all */
684 : :
685 : : /* We made this slot active, so it's ours now. */
1905 fujii@postgresql.org 686 : 1213 : MyReplicationSlot = s;
687 : :
688 : : /*
689 : : * We need to check for invalidation after making the slot ours to avoid
690 : : * the possible race condition with the checkpointer that can otherwise
691 : : * invalidate the slot immediately after the check.
692 : : */
191 akapila@postgresql.o 693 [ + + + + ]: 1213 : if (error_if_invalid && s->data.invalidated != RS_INVAL_NONE)
694 [ + - ]: 1 : ereport(ERROR,
695 : : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
696 : : errmsg("can no longer access replication slot \"%s\"",
697 : : NameStr(s->data.name)),
698 : : errdetail("This replication slot has been invalidated due to \"%s\".",
699 : : GetSlotInvalidationCauseName(s->data.invalidated)));
700 : :
701 : : /* Let everybody know we've modified this slot */
702 : 1212 : ConditionVariableBroadcast(&s->active_cv);
703 : :
704 : : /*
705 : : * The call to pgstat_acquire_replslot() protects against stats for a
706 : : * different slot, from before a restart or such, being present during
707 : : * pgstat_report_replslot().
708 : : */
1249 andres@anarazel.de 709 [ + + ]: 1212 : if (SlotIsLogical(s))
710 : 1021 : pgstat_acquire_replslot(s);
711 : :
712 : :
655 akapila@postgresql.o 713 [ + + ]: 1212 : if (am_walsender)
714 : : {
715 [ + - + - : 835 : ereport(log_replication_commands ? LOG : DEBUG1,
+ + ]
716 : : SlotIsLogical(s)
717 : : ? errmsg("acquired logical replication slot \"%s\"",
718 : : NameStr(s->data.name))
719 : : : errmsg("acquired physical replication slot \"%s\"",
720 : : NameStr(s->data.name)));
721 : : }
4236 rhaas@postgresql.org 722 : 1212 : }
723 : :
724 : : /*
725 : : * Release the replication slot that this backend considers to own.
726 : : *
727 : : * This or another backend can re-acquire the slot later.
728 : : * Resources this slot requires will be preserved.
729 : : */
730 : : void
731 : 1447 : ReplicationSlotRelease(void)
732 : : {
733 : 1447 : ReplicationSlot *slot = MyReplicationSlot;
655 akapila@postgresql.o 734 : 1447 : char *slotname = NULL; /* keep compiler quiet */
735 : 1447 : bool is_logical = false; /* keep compiler quiet */
530 736 : 1447 : TimestampTz now = 0;
737 : :
3791 andres@anarazel.de 738 [ + - - + ]: 1447 : Assert(slot != NULL && slot->active_pid != 0);
739 : :
655 akapila@postgresql.o 740 [ + + ]: 1447 : if (am_walsender)
741 : : {
742 : 1012 : slotname = pstrdup(NameStr(slot->data.name));
743 : 1012 : is_logical = SlotIsLogical(slot);
744 : : }
745 : :
4205 rhaas@postgresql.org 746 [ + + ]: 1447 : if (slot->data.persistency == RS_EPHEMERAL)
747 : : {
748 : : /*
749 : : * Delete the slot. There is no !PANIC case where this is allowed to
750 : : * fail, all that may happen is an incomplete cleanup of the on-disk
751 : : * data.
752 : : */
753 : 5 : ReplicationSlotDropAcquired();
754 : : }
755 : :
756 : : /*
757 : : * If slot needed to temporarily restrain both data and catalog xmin to
758 : : * create the catalog snapshot, remove that temporary constraint.
759 : : * Snapshots can only be exported while the initial snapshot is still
760 : : * acquired.
761 : : */
3058 andres@anarazel.de 762 [ + + ]: 1447 : if (!TransactionIdIsValid(slot->data.xmin) &&
763 [ + + ]: 1425 : TransactionIdIsValid(slot->effective_xmin))
764 : : {
765 [ - + ]: 195 : SpinLockAcquire(&slot->mutex);
766 : 195 : slot->effective_xmin = InvalidTransactionId;
767 : 195 : SpinLockRelease(&slot->mutex);
768 : 195 : ReplicationSlotsComputeRequiredXmin(false);
769 : : }
770 : :
771 : : /*
772 : : * Set the time since the slot has become inactive. We get the current
773 : : * time beforehand to avoid system call while holding the spinlock.
774 : : */
519 akapila@postgresql.o 775 : 1447 : now = GetCurrentTimestamp();
776 : :
2965 alvherre@alvh.no-ip. 777 [ + + ]: 1447 : if (slot->data.persistency == RS_PERSISTENT)
778 : : {
779 : : /*
780 : : * Mark persistent slot inactive. We're not freeing it, just
781 : : * disconnecting, but wake up others that may be waiting for it.
782 : : */
783 [ - + ]: 1169 : SpinLockAcquire(&slot->mutex);
784 : 1169 : slot->active_pid = 0;
213 akapila@postgresql.o 785 : 1169 : ReplicationSlotSetInactiveSince(slot, now, false);
2965 alvherre@alvh.no-ip. 786 : 1169 : SpinLockRelease(&slot->mutex);
787 : 1169 : ConditionVariableBroadcast(&slot->active_cv);
788 : : }
789 : : else
213 akapila@postgresql.o 790 : 278 : ReplicationSlotSetInactiveSince(slot, now, true);
791 : :
4205 rhaas@postgresql.org 792 : 1447 : MyReplicationSlot = NULL;
793 : :
794 : : /* might not have been set when we've been a plain slot */
1395 alvherre@alvh.no-ip. 795 : 1447 : LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
1755 796 : 1447 : MyProc->statusFlags &= ~PROC_IN_LOGICAL_DECODING;
797 : 1447 : ProcGlobal->statusFlags[MyProc->pgxactoff] = MyProc->statusFlags;
4205 rhaas@postgresql.org 798 : 1447 : LWLockRelease(ProcArrayLock);
799 : :
655 akapila@postgresql.o 800 [ + + ]: 1447 : if (am_walsender)
801 : : {
802 [ + - + - : 1012 : ereport(log_replication_commands ? LOG : DEBUG1,
+ + ]
803 : : is_logical
804 : : ? errmsg("released logical replication slot \"%s\"",
805 : : slotname)
806 : : : errmsg("released physical replication slot \"%s\"",
807 : : slotname));
808 : :
809 : 1012 : pfree(slotname);
810 : : }
4236 rhaas@postgresql.org 811 : 1447 : }
812 : :
813 : : /*
814 : : * Cleanup temporary slots created in current session.
815 : : *
816 : : * Cleanup only synced temporary slots if 'synced_only' is true, else
817 : : * cleanup all temporary slots.
818 : : */
819 : : void
499 akapila@postgresql.o 820 : 40534 : ReplicationSlotCleanup(bool synced_only)
821 : : {
822 : : int i;
823 : :
3194 peter_e@gmx.net 824 [ + - ]: 40534 : Assert(MyReplicationSlot == NULL);
825 : :
2965 alvherre@alvh.no-ip. 826 : 40534 : restart:
827 : 40672 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
3194 peter_e@gmx.net 828 [ + + ]: 442378 : for (i = 0; i < max_replication_slots; i++)
829 : : {
830 : 401844 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
831 : :
2965 alvherre@alvh.no-ip. 832 [ + + ]: 401844 : if (!s->in_use)
833 : 388731 : continue;
834 : :
835 [ + + ]: 13113 : SpinLockAcquire(&s->mutex);
499 akapila@postgresql.o 836 [ + + ]: 13113 : if ((s->active_pid == MyProcPid &&
837 [ - + - - ]: 138 : (!synced_only || s->data.synced)))
838 : : {
2965 alvherre@alvh.no-ip. 839 [ - + ]: 138 : Assert(s->data.persistency == RS_TEMPORARY);
840 : 138 : SpinLockRelease(&s->mutex);
841 : 138 : LWLockRelease(ReplicationSlotControlLock); /* avoid deadlock */
842 : :
3194 peter_e@gmx.net 843 : 138 : ReplicationSlotDropPtr(s);
844 : :
2965 alvherre@alvh.no-ip. 845 : 138 : ConditionVariableBroadcast(&s->active_cv);
846 : 138 : goto restart;
847 : : }
848 : : else
849 : 12975 : SpinLockRelease(&s->mutex);
850 : : }
851 : :
852 : 40534 : LWLockRelease(ReplicationSlotControlLock);
3194 peter_e@gmx.net 853 : 40534 : }
854 : :
855 : : /*
856 : : * Permanently drop replication slot identified by the passed in name.
857 : : */
858 : : void
2965 alvherre@alvh.no-ip. 859 : 387 : ReplicationSlotDrop(const char *name, bool nowait)
860 : : {
4205 rhaas@postgresql.org 861 [ - + ]: 387 : Assert(MyReplicationSlot == NULL);
862 : :
218 akapila@postgresql.o 863 : 387 : ReplicationSlotAcquire(name, nowait, false);
864 : :
865 : : /*
866 : : * Do not allow users to drop the slots which are currently being synced
867 : : * from the primary to the standby.
868 : : */
570 869 [ + + + - ]: 379 : if (RecoveryInProgress() && MyReplicationSlot->data.synced)
870 [ + - ]: 1 : ereport(ERROR,
871 : : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
872 : : errmsg("cannot drop replication slot \"%s\"", name),
873 : : errdetail("This replication slot is being synchronized from the primary server."));
874 : :
4205 rhaas@postgresql.org 875 : 378 : ReplicationSlotDropAcquired();
876 : 378 : }
877 : :
878 : : /*
879 : : * Change the definition of the slot identified by the specified name.
880 : : */
881 : : void
409 akapila@postgresql.o 882 : 6 : ReplicationSlotAlter(const char *name, const bool *failover,
883 : : const bool *two_phase)
884 : : {
885 : 6 : bool update_slot = false;
886 : :
586 887 [ - + ]: 6 : Assert(MyReplicationSlot == NULL);
409 888 [ + + - + ]: 6 : Assert(failover || two_phase);
889 : :
218 890 : 6 : ReplicationSlotAcquire(name, false, true);
891 : :
586 892 [ - + ]: 6 : if (SlotIsPhysical(MyReplicationSlot))
586 akapila@postgresql.o 893 [ # # ]:UBC 0 : ereport(ERROR,
894 : : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
895 : : errmsg("cannot use %s with a physical replication slot",
896 : : "ALTER_REPLICATION_SLOT"));
897 : :
570 akapila@postgresql.o 898 [ + + ]:CBC 6 : if (RecoveryInProgress())
899 : : {
900 : : /*
901 : : * Do not allow users to alter the slots which are currently being
902 : : * synced from the primary to the standby.
903 : : */
904 [ + - ]: 1 : if (MyReplicationSlot->data.synced)
905 [ + - ]: 1 : ereport(ERROR,
906 : : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
907 : : errmsg("cannot alter replication slot \"%s\"", name),
908 : : errdetail("This replication slot is being synchronized from the primary server."));
909 : :
910 : : /*
911 : : * Do not allow users to enable failover on the standby as we do not
912 : : * support sync to the cascading standby.
913 : : */
409 akapila@postgresql.o 914 [ # # # # ]:UBC 0 : if (failover && *failover)
570 915 [ # # ]: 0 : ereport(ERROR,
916 : : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
917 : : errmsg("cannot enable failover for a replication slot"
918 : : " on the standby"));
919 : : }
920 : :
409 akapila@postgresql.o 921 [ + + ]:CBC 5 : if (failover)
922 : : {
923 : : /*
924 : : * Do not allow users to enable failover for temporary slots as we do
925 : : * not support syncing temporary slots to the standby.
926 : : */
927 [ + + - + ]: 4 : if (*failover && MyReplicationSlot->data.persistency == RS_TEMPORARY)
409 akapila@postgresql.o 928 [ # # ]:UBC 0 : ereport(ERROR,
929 : : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
930 : : errmsg("cannot enable failover for a temporary replication slot"));
931 : :
409 akapila@postgresql.o 932 [ + - ]:CBC 4 : if (MyReplicationSlot->data.failover != *failover)
933 : : {
934 [ - + ]: 4 : SpinLockAcquire(&MyReplicationSlot->mutex);
935 : 4 : MyReplicationSlot->data.failover = *failover;
936 : 4 : SpinLockRelease(&MyReplicationSlot->mutex);
937 : :
938 : 4 : update_slot = true;
939 : : }
940 : : }
941 : :
942 [ + + + - ]: 5 : if (two_phase && MyReplicationSlot->data.two_phase != *two_phase)
943 : : {
577 944 [ - + ]: 1 : SpinLockAcquire(&MyReplicationSlot->mutex);
409 945 : 1 : MyReplicationSlot->data.two_phase = *two_phase;
577 946 : 1 : SpinLockRelease(&MyReplicationSlot->mutex);
947 : :
409 948 : 1 : update_slot = true;
949 : : }
950 : :
951 [ + - ]: 5 : if (update_slot)
952 : : {
577 953 : 5 : ReplicationSlotMarkDirty();
954 : 5 : ReplicationSlotSave();
955 : : }
956 : :
586 957 : 5 : ReplicationSlotRelease();
958 : 5 : }
959 : :
960 : : /*
961 : : * Permanently drop the currently acquired replication slot.
962 : : */
963 : : void
4205 rhaas@postgresql.org 964 : 389 : ReplicationSlotDropAcquired(void)
965 : : {
966 : 389 : ReplicationSlot *slot = MyReplicationSlot;
967 : :
968 [ - + ]: 389 : Assert(MyReplicationSlot != NULL);
969 : :
970 : : /* slot isn't acquired anymore */
971 : 389 : MyReplicationSlot = NULL;
972 : :
3194 peter_e@gmx.net 973 : 389 : ReplicationSlotDropPtr(slot);
974 : 389 : }
975 : :
976 : : /*
977 : : * Permanently drop the replication slot which will be released by the point
978 : : * this function returns.
979 : : */
980 : : static void
981 : 527 : ReplicationSlotDropPtr(ReplicationSlot *slot)
982 : : {
983 : : char path[MAXPGPATH];
984 : : char tmppath[MAXPGPATH];
985 : :
986 : : /*
987 : : * If some other backend ran this code concurrently with us, we might try
988 : : * to delete a slot with a certain name while someone else was trying to
989 : : * create a slot with the same name.
990 : : */
4236 rhaas@postgresql.org 991 : 527 : LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
992 : :
993 : : /* Generate pathnames. */
372 michael@paquier.xyz 994 : 527 : sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(slot->data.name));
995 : 527 : sprintf(tmppath, "%s/%s.tmp", PG_REPLSLOT_DIR, NameStr(slot->data.name));
996 : :
997 : : /*
998 : : * Rename the slot directory on disk, so that we'll no longer recognize
999 : : * this as a valid slot. Note that if this fails, we've got to mark the
1000 : : * slot inactive before bailing out. If we're dropping an ephemeral or a
1001 : : * temporary slot, we better never fail hard as the caller won't expect
1002 : : * the slot to survive and this might get called during error handling.
1003 : : */
4205 rhaas@postgresql.org 1004 [ + - ]: 527 : if (rename(path, tmppath) == 0)
1005 : : {
1006 : : /*
1007 : : * We need to fsync() the directory we just renamed and its parent to
1008 : : * make sure that our changes are on disk in a crash-safe fashion. If
1009 : : * fsync() fails, we can't be sure whether the changes are on disk or
1010 : : * not. For now, we handle that by panicking;
1011 : : * StartupReplicationSlots() will try to straighten it out after
1012 : : * restart.
1013 : : */
1014 : 527 : START_CRIT_SECTION();
1015 : 527 : fsync_fname(tmppath, true);
372 michael@paquier.xyz 1016 : 527 : fsync_fname(PG_REPLSLOT_DIR, true);
4205 rhaas@postgresql.org 1017 [ - + ]: 527 : END_CRIT_SECTION();
1018 : : }
1019 : : else
1020 : : {
3194 peter_e@gmx.net 1021 :UBC 0 : bool fail_softly = slot->data.persistency != RS_PERSISTENT;
1022 : :
4236 rhaas@postgresql.org 1023 [ # # ]: 0 : SpinLockAcquire(&slot->mutex);
3623 1024 : 0 : slot->active_pid = 0;
4236 1025 : 0 : SpinLockRelease(&slot->mutex);
1026 : :
1027 : : /* wake up anyone waiting on this slot */
2965 alvherre@alvh.no-ip. 1028 : 0 : ConditionVariableBroadcast(&slot->active_cv);
1029 : :
4205 rhaas@postgresql.org 1030 [ # # # # ]: 0 : ereport(fail_softly ? WARNING : ERROR,
1031 : : (errcode_for_file_access(),
1032 : : errmsg("could not rename file \"%s\" to \"%s\": %m",
1033 : : path, tmppath)));
1034 : : }
1035 : :
1036 : : /*
1037 : : * The slot is definitely gone. Lock out concurrent scans of the array
1038 : : * long enough to kill it. It's OK to clear the active PID here without
1039 : : * grabbing the mutex because nobody else can be scanning the array here,
1040 : : * and nobody can be attached to this slot and thus access it without
1041 : : * scanning the array.
1042 : : *
1043 : : * Also wake up processes waiting for it.
1044 : : */
4236 rhaas@postgresql.org 1045 :CBC 527 : LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
3791 andres@anarazel.de 1046 : 527 : slot->active_pid = 0;
4236 rhaas@postgresql.org 1047 : 527 : slot->in_use = false;
1048 : 527 : LWLockRelease(ReplicationSlotControlLock);
2965 alvherre@alvh.no-ip. 1049 : 527 : ConditionVariableBroadcast(&slot->active_cv);
1050 : :
1051 : : /*
1052 : : * Slot is dead and doesn't prevent resource removal anymore, recompute
1053 : : * limits.
1054 : : */
4205 rhaas@postgresql.org 1055 : 527 : ReplicationSlotsComputeRequiredXmin(false);
4236 1056 : 527 : ReplicationSlotsComputeRequiredLSN();
1057 : :
1058 : : /*
1059 : : * If removing the directory fails, the worst thing that will happen is
1060 : : * that the user won't be able to create a new slot with the same name
1061 : : * until the next server restart. We warn about it, but that's all.
1062 : : */
1063 [ - + ]: 527 : if (!rmtree(tmppath, true))
4236 rhaas@postgresql.org 1064 [ # # ]:UBC 0 : ereport(WARNING,
1065 : : (errmsg("could not remove directory \"%s\"", tmppath)));
1066 : :
1067 : : /*
1068 : : * Drop the statistics entry for the replication slot. Do this while
1069 : : * holding ReplicationSlotAllocationLock so that we don't drop a
1070 : : * statistics entry for another slot with the same name just created in
1071 : : * another session.
1072 : : */
1794 akapila@postgresql.o 1073 [ + + ]:CBC 527 : if (SlotIsLogical(slot))
1249 andres@anarazel.de 1074 : 378 : pgstat_drop_replslot(slot);
1075 : :
1076 : : /*
1077 : : * We release this at the very end, so that nobody starts trying to create
1078 : : * a slot while we're still cleaning up the detritus of the old one.
1079 : : */
4236 rhaas@postgresql.org 1080 : 527 : LWLockRelease(ReplicationSlotAllocationLock);
1081 : 527 : }
1082 : :
1083 : : /*
1084 : : * Serialize the currently acquired slot's state from memory to disk, thereby
1085 : : * guaranteeing the current state will survive a crash.
1086 : : */
1087 : : void
1088 : 1276 : ReplicationSlotSave(void)
1089 : : {
1090 : : char path[MAXPGPATH];
1091 : :
1092 [ - + ]: 1276 : Assert(MyReplicationSlot != NULL);
1093 : :
372 michael@paquier.xyz 1094 : 1276 : sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(MyReplicationSlot->data.name));
4236 rhaas@postgresql.org 1095 : 1276 : SaveSlotToPath(MyReplicationSlot, path, ERROR);
1096 : 1276 : }
1097 : :
1098 : : /*
1099 : : * Signal that it would be useful if the currently acquired slot would be
1100 : : * flushed out to disk.
1101 : : *
1102 : : * Note that the actual flush to disk can be delayed for a long time, if
1103 : : * required for correctness explicitly do a ReplicationSlotSave().
1104 : : */
1105 : : void
1106 : 63894 : ReplicationSlotMarkDirty(void)
1107 : : {
3623 1108 : 63894 : ReplicationSlot *slot = MyReplicationSlot;
1109 : :
4236 1110 [ - + ]: 63894 : Assert(MyReplicationSlot != NULL);
1111 : :
3623 1112 [ - + ]: 63894 : SpinLockAcquire(&slot->mutex);
1113 : 63894 : MyReplicationSlot->just_dirtied = true;
1114 : 63894 : MyReplicationSlot->dirty = true;
1115 : 63894 : SpinLockRelease(&slot->mutex);
4236 1116 : 63894 : }
1117 : :
1118 : : /*
1119 : : * Convert a slot that's marked as RS_EPHEMERAL or RS_TEMPORARY to a
1120 : : * RS_PERSISTENT slot, guaranteeing it will be there after an eventual crash.
1121 : : */
1122 : : void
4205 1123 : 425 : ReplicationSlotPersist(void)
1124 : : {
1125 : 425 : ReplicationSlot *slot = MyReplicationSlot;
1126 : :
1127 [ - + ]: 425 : Assert(slot != NULL);
1128 [ - + ]: 425 : Assert(slot->data.persistency != RS_PERSISTENT);
1129 : :
3623 1130 [ - + ]: 425 : SpinLockAcquire(&slot->mutex);
1131 : 425 : slot->data.persistency = RS_PERSISTENT;
1132 : 425 : SpinLockRelease(&slot->mutex);
1133 : :
4205 1134 : 425 : ReplicationSlotMarkDirty();
1135 : 425 : ReplicationSlotSave();
1136 : 425 : }
1137 : :
1138 : : /*
1139 : : * Compute the oldest xmin across all slots and store it in the ProcArray.
1140 : : *
1141 : : * If already_locked is true, ProcArrayLock has already been acquired
1142 : : * exclusively.
1143 : : */
1144 : : void
1145 : 2189 : ReplicationSlotsComputeRequiredXmin(bool already_locked)
1146 : : {
1147 : : int i;
4236 1148 : 2189 : TransactionId agg_xmin = InvalidTransactionId;
4205 1149 : 2189 : TransactionId agg_catalog_xmin = InvalidTransactionId;
1150 : :
4236 1151 [ - + ]: 2189 : Assert(ReplicationSlotCtl != NULL);
1152 : :
3058 andres@anarazel.de 1153 : 2189 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1154 : :
4236 rhaas@postgresql.org 1155 [ + + ]: 22369 : for (i = 0; i < max_replication_slots; i++)
1156 : : {
1157 : 20180 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
1158 : : TransactionId effective_xmin;
1159 : : TransactionId effective_catalog_xmin;
1160 : : bool invalidated;
1161 : :
1162 [ + + ]: 20180 : if (!s->in_use)
1163 : 18084 : continue;
1164 : :
3623 1165 [ - + ]: 2096 : SpinLockAcquire(&s->mutex);
1166 : 2096 : effective_xmin = s->effective_xmin;
1167 : 2096 : effective_catalog_xmin = s->effective_catalog_xmin;
883 andres@anarazel.de 1168 : 2096 : invalidated = s->data.invalidated != RS_INVAL_NONE;
3623 rhaas@postgresql.org 1169 : 2096 : SpinLockRelease(&s->mutex);
1170 : :
1171 : : /* invalidated slots need not apply */
1019 alvherre@alvh.no-ip. 1172 [ + + ]: 2096 : if (invalidated)
1173 : 5 : continue;
1174 : :
1175 : : /* check the data xmin */
4236 rhaas@postgresql.org 1176 [ + + + + ]: 2091 : if (TransactionIdIsValid(effective_xmin) &&
1177 [ - + ]: 2 : (!TransactionIdIsValid(agg_xmin) ||
1178 : 2 : TransactionIdPrecedes(effective_xmin, agg_xmin)))
1179 : 270 : agg_xmin = effective_xmin;
1180 : :
1181 : : /* check the catalog xmin */
4205 1182 [ + + + + ]: 2091 : if (TransactionIdIsValid(effective_catalog_xmin) &&
1183 [ + + ]: 892 : (!TransactionIdIsValid(agg_catalog_xmin) ||
1184 : 892 : TransactionIdPrecedes(effective_catalog_xmin, agg_catalog_xmin)))
1185 : 1104 : agg_catalog_xmin = effective_catalog_xmin;
1186 : : }
1187 : :
3058 andres@anarazel.de 1188 : 2189 : LWLockRelease(ReplicationSlotControlLock);
1189 : :
4205 rhaas@postgresql.org 1190 : 2189 : ProcArraySetReplicationSlotXmin(agg_xmin, agg_catalog_xmin, already_locked);
4236 1191 : 2189 : }
1192 : :
1193 : : /*
1194 : : * Compute the oldest restart LSN across all slots and inform xlog module.
1195 : : *
1196 : : * Note: while max_slot_wal_keep_size is theoretically relevant for this
1197 : : * purpose, we don't try to account for that, because this module doesn't
1198 : : * know what to compare against.
1199 : : */
1200 : : void
1201 : 64532 : ReplicationSlotsComputeRequiredLSN(void)
1202 : : {
1203 : : int i;
4141 bruce@momjian.us 1204 : 64532 : XLogRecPtr min_required = InvalidXLogRecPtr;
1205 : :
4236 rhaas@postgresql.org 1206 [ - + ]: 64532 : Assert(ReplicationSlotCtl != NULL);
1207 : :
1208 : 64532 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1209 [ + + ]: 707608 : for (i = 0; i < max_replication_slots; i++)
1210 : : {
1211 : 643076 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
1212 : : XLogRecPtr restart_lsn;
1213 : : XLogRecPtr last_saved_restart_lsn;
1214 : : bool invalidated;
1215 : : ReplicationSlotPersistency persistency;
1216 : :
1217 [ + + ]: 643076 : if (!s->in_use)
1218 : 578553 : continue;
1219 : :
3623 1220 [ + + ]: 64523 : SpinLockAcquire(&s->mutex);
84 akorotkov@postgresql 1221 : 64523 : persistency = s->data.persistency;
3623 rhaas@postgresql.org 1222 : 64523 : restart_lsn = s->data.restart_lsn;
883 andres@anarazel.de 1223 : 64523 : invalidated = s->data.invalidated != RS_INVAL_NONE;
84 akorotkov@postgresql 1224 : 64523 : last_saved_restart_lsn = s->last_saved_restart_lsn;
3623 rhaas@postgresql.org 1225 : 64523 : SpinLockRelease(&s->mutex);
1226 : :
1227 : : /* invalidated slots need not apply */
883 andres@anarazel.de 1228 [ + + ]: 64523 : if (invalidated)
1229 : 6 : continue;
1230 : :
1231 : : /*
1232 : : * For persistent slot use last_saved_restart_lsn to compute the
1233 : : * oldest LSN for removal of WAL segments. The segments between
1234 : : * last_saved_restart_lsn and restart_lsn might be needed by a
1235 : : * persistent slot in the case of database crash. Non-persistent
1236 : : * slots can't survive the database crash, so we don't care about
1237 : : * last_saved_restart_lsn for them.
1238 : : */
84 akorotkov@postgresql 1239 [ + + ]: 64517 : if (persistency == RS_PERSISTENT)
1240 : : {
1241 [ + + + + ]: 63752 : if (last_saved_restart_lsn != InvalidXLogRecPtr &&
1242 : : restart_lsn > last_saved_restart_lsn)
1243 : : {
1244 : 61719 : restart_lsn = last_saved_restart_lsn;
1245 : : }
1246 : : }
1247 : :
4236 rhaas@postgresql.org 1248 [ + + + + ]: 64517 : if (restart_lsn != InvalidXLogRecPtr &&
1249 [ + + ]: 984 : (min_required == InvalidXLogRecPtr ||
1250 : : restart_lsn < min_required))
1251 : 63624 : min_required = restart_lsn;
1252 : : }
1253 : 64532 : LWLockRelease(ReplicationSlotControlLock);
1254 : :
1255 : 64532 : XLogSetReplicationSlotMinimumLSN(min_required);
1256 : 64532 : }
1257 : :
1258 : : /*
1259 : : * Compute the oldest WAL LSN required by *logical* decoding slots..
1260 : : *
1261 : : * Returns InvalidXLogRecPtr if logical decoding is disabled or no logical
1262 : : * slots exist.
1263 : : *
1264 : : * NB: this returns a value >= ReplicationSlotsComputeRequiredLSN(), since it
1265 : : * ignores physical replication slots.
1266 : : *
1267 : : * The results aren't required frequently, so we don't maintain a precomputed
1268 : : * value like we do for ComputeRequiredLSN() and ComputeRequiredXmin().
1269 : : */
1270 : : XLogRecPtr
4205 1271 : 3354 : ReplicationSlotsComputeLogicalRestartLSN(void)
1272 : : {
1273 : 3354 : XLogRecPtr result = InvalidXLogRecPtr;
1274 : : int i;
1275 : :
1276 [ + + ]: 3354 : if (max_replication_slots <= 0)
4205 rhaas@postgresql.org 1277 :GBC 2 : return InvalidXLogRecPtr;
1278 : :
4205 rhaas@postgresql.org 1279 :CBC 3352 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1280 : :
1281 [ + + ]: 36384 : for (i = 0; i < max_replication_slots; i++)
1282 : : {
1283 : : ReplicationSlot *s;
1284 : : XLogRecPtr restart_lsn;
1285 : : XLogRecPtr last_saved_restart_lsn;
1286 : : bool invalidated;
1287 : : ReplicationSlotPersistency persistency;
1288 : :
1289 : 33032 : s = &ReplicationSlotCtl->replication_slots[i];
1290 : :
1291 : : /* cannot change while ReplicationSlotCtlLock is held */
1292 [ + + ]: 33032 : if (!s->in_use)
1293 : 32304 : continue;
1294 : :
1295 : : /* we're only interested in logical slots */
3679 andres@anarazel.de 1296 [ + + ]: 728 : if (!SlotIsLogical(s))
4205 rhaas@postgresql.org 1297 : 520 : continue;
1298 : :
1299 : : /* read once, it's ok if it increases while we're checking */
1300 [ - + ]: 208 : SpinLockAcquire(&s->mutex);
84 akorotkov@postgresql 1301 : 208 : persistency = s->data.persistency;
4205 rhaas@postgresql.org 1302 : 208 : restart_lsn = s->data.restart_lsn;
883 andres@anarazel.de 1303 : 208 : invalidated = s->data.invalidated != RS_INVAL_NONE;
84 akorotkov@postgresql 1304 : 208 : last_saved_restart_lsn = s->last_saved_restart_lsn;
4205 rhaas@postgresql.org 1305 : 208 : SpinLockRelease(&s->mutex);
1306 : :
1307 : : /* invalidated slots need not apply */
883 andres@anarazel.de 1308 [ - + ]: 208 : if (invalidated)
883 andres@anarazel.de 1309 :UBC 0 : continue;
1310 : :
1311 : : /*
1312 : : * For persistent slot use last_saved_restart_lsn to compute the
1313 : : * oldest LSN for removal of WAL segments. The segments between
1314 : : * last_saved_restart_lsn and restart_lsn might be needed by a
1315 : : * persistent slot in the case of database crash. Non-persistent
1316 : : * slots can't survive the database crash, so we don't care about
1317 : : * last_saved_restart_lsn for them.
1318 : : */
84 akorotkov@postgresql 1319 [ + + ]:CBC 208 : if (persistency == RS_PERSISTENT)
1320 : : {
1321 [ + - - + ]: 206 : if (last_saved_restart_lsn != InvalidXLogRecPtr &&
1322 : : restart_lsn > last_saved_restart_lsn)
1323 : : {
84 akorotkov@postgresql 1324 :UBC 0 : restart_lsn = last_saved_restart_lsn;
1325 : : }
1326 : : }
1327 : :
1978 alvherre@alvh.no-ip. 1328 [ - + ]:CBC 208 : if (restart_lsn == InvalidXLogRecPtr)
1978 alvherre@alvh.no-ip. 1329 :UBC 0 : continue;
1330 : :
4205 rhaas@postgresql.org 1331 [ + + + + ]:CBC 208 : if (result == InvalidXLogRecPtr ||
1332 : : restart_lsn < result)
1333 : 166 : result = restart_lsn;
1334 : : }
1335 : :
1336 : 3352 : LWLockRelease(ReplicationSlotControlLock);
1337 : :
1338 : 3352 : return result;
1339 : : }
1340 : :
1341 : : /*
1342 : : * ReplicationSlotsCountDBSlots -- count the number of slots that refer to the
1343 : : * passed database oid.
1344 : : *
1345 : : * Returns true if there are any slots referencing the database. *nslots will
1346 : : * be set to the absolute number of slots in the database, *nactive to ones
1347 : : * currently active.
1348 : : */
1349 : : bool
1350 : 45 : ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
1351 : : {
1352 : : int i;
1353 : :
1354 : 45 : *nslots = *nactive = 0;
1355 : :
1356 [ - + ]: 45 : if (max_replication_slots <= 0)
4205 rhaas@postgresql.org 1357 :UBC 0 : return false;
1358 : :
4205 rhaas@postgresql.org 1359 :CBC 45 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1360 [ + + ]: 472 : for (i = 0; i < max_replication_slots; i++)
1361 : : {
1362 : : ReplicationSlot *s;
1363 : :
1364 : 427 : s = &ReplicationSlotCtl->replication_slots[i];
1365 : :
1366 : : /* cannot change while ReplicationSlotCtlLock is held */
1367 [ + + ]: 427 : if (!s->in_use)
1368 : 407 : continue;
1369 : :
1370 : : /* only logical slots are database specific, skip */
3679 andres@anarazel.de 1371 [ + + ]: 20 : if (!SlotIsLogical(s))
4201 bruce@momjian.us 1372 : 9 : continue;
1373 : :
1374 : : /* not our database, skip */
4205 rhaas@postgresql.org 1375 [ + + ]: 11 : if (s->data.database != dboid)
1376 : 8 : continue;
1377 : :
1378 : : /* NB: intentionally counting invalidated slots */
1379 : :
1380 : : /* count slots with spinlock held */
1381 [ - + ]: 3 : SpinLockAcquire(&s->mutex);
1382 : 3 : (*nslots)++;
3791 andres@anarazel.de 1383 [ + + ]: 3 : if (s->active_pid != 0)
4205 rhaas@postgresql.org 1384 : 1 : (*nactive)++;
1385 : 3 : SpinLockRelease(&s->mutex);
1386 : : }
1387 : 45 : LWLockRelease(ReplicationSlotControlLock);
1388 : :
1389 [ + + ]: 45 : if (*nslots > 0)
1390 : 3 : return true;
1391 : 42 : return false;
1392 : : }
1393 : :
1394 : : /*
1395 : : * ReplicationSlotsDropDBSlots -- Drop all db-specific slots relating to the
1396 : : * passed database oid. The caller should hold an exclusive lock on the
1397 : : * pg_database oid for the database to prevent creation of new slots on the db
1398 : : * or replay from existing slots.
1399 : : *
1400 : : * Another session that concurrently acquires an existing slot on the target DB
1401 : : * (most likely to drop it) may cause this function to ERROR. If that happens
1402 : : * it may have dropped some but not all slots.
1403 : : *
1404 : : * This routine isn't as efficient as it could be - but we don't drop
1405 : : * databases often, especially databases with lots of slots.
1406 : : */
1407 : : void
3084 simon@2ndQuadrant.co 1408 : 56 : ReplicationSlotsDropDBSlots(Oid dboid)
1409 : : {
1410 : : int i;
1411 : :
1412 [ + - ]: 56 : if (max_replication_slots <= 0)
3084 simon@2ndQuadrant.co 1413 :UBC 0 : return;
1414 : :
3084 simon@2ndQuadrant.co 1415 :CBC 56 : restart:
1416 : 59 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1417 [ + + ]: 592 : for (i = 0; i < max_replication_slots; i++)
1418 : : {
1419 : : ReplicationSlot *s;
1420 : : char *slotname;
1421 : : int active_pid;
1422 : :
1423 : 536 : s = &ReplicationSlotCtl->replication_slots[i];
1424 : :
1425 : : /* cannot change while ReplicationSlotCtlLock is held */
1426 [ + + ]: 536 : if (!s->in_use)
1427 : 510 : continue;
1428 : :
1429 : : /* only logical slots are database specific, skip */
1430 [ + + ]: 26 : if (!SlotIsLogical(s))
1431 : 10 : continue;
1432 : :
1433 : : /* not our database, skip */
1434 [ + + ]: 16 : if (s->data.database != dboid)
1435 : 13 : continue;
1436 : :
1437 : : /* NB: intentionally including invalidated slots */
1438 : :
1439 : : /* acquire slot, so ReplicationSlotDropAcquired can be reused */
1440 [ - + ]: 3 : SpinLockAcquire(&s->mutex);
1441 : : /* can't change while ReplicationSlotControlLock is held */
3076 andres@anarazel.de 1442 : 3 : slotname = NameStr(s->data.name);
3084 simon@2ndQuadrant.co 1443 : 3 : active_pid = s->active_pid;
1444 [ + - ]: 3 : if (active_pid == 0)
1445 : : {
1446 : 3 : MyReplicationSlot = s;
1447 : 3 : s->active_pid = MyProcPid;
1448 : : }
1449 : 3 : SpinLockRelease(&s->mutex);
1450 : :
1451 : : /*
1452 : : * Even though we hold an exclusive lock on the database object a
1453 : : * logical slot for that DB can still be active, e.g. if it's
1454 : : * concurrently being dropped by a backend connected to another DB.
1455 : : *
1456 : : * That's fairly unlikely in practice, so we'll just bail out.
1457 : : *
1458 : : * The slot sync worker holds a shared lock on the database before
1459 : : * operating on synced logical slots to avoid conflict with the drop
1460 : : * happening here. The persistent synced slots are thus safe but there
1461 : : * is a possibility that the slot sync worker has created a temporary
1462 : : * slot (which stays active even on release) and we are trying to drop
1463 : : * that here. In practice, the chances of hitting this scenario are
1464 : : * less as during slot synchronization, the temporary slot is
1465 : : * immediately converted to persistent and thus is safe due to the
1466 : : * shared lock taken on the database. So, we'll just bail out in such
1467 : : * a case.
1468 : : *
1469 : : * XXX: We can consider shutting down the slot sync worker before
1470 : : * trying to drop synced temporary slots here.
1471 : : */
1472 [ - + ]: 3 : if (active_pid)
3076 andres@anarazel.de 1473 [ # # ]:UBC 0 : ereport(ERROR,
1474 : : (errcode(ERRCODE_OBJECT_IN_USE),
1475 : : errmsg("replication slot \"%s\" is active for PID %d",
1476 : : slotname, active_pid)));
1477 : :
1478 : : /*
1479 : : * To avoid duplicating ReplicationSlotDropAcquired() and to avoid
1480 : : * holding ReplicationSlotControlLock over filesystem operations,
1481 : : * release ReplicationSlotControlLock and use
1482 : : * ReplicationSlotDropAcquired.
1483 : : *
1484 : : * As that means the set of slots could change, restart scan from the
1485 : : * beginning each time we release the lock.
1486 : : */
3084 simon@2ndQuadrant.co 1487 :CBC 3 : LWLockRelease(ReplicationSlotControlLock);
1488 : 3 : ReplicationSlotDropAcquired();
1489 : 3 : goto restart;
1490 : : }
1491 : 56 : LWLockRelease(ReplicationSlotControlLock);
1492 : : }
1493 : :
1494 : :
1495 : : /*
1496 : : * Check whether the server's configuration supports using replication
1497 : : * slots.
1498 : : */
1499 : : void
4236 rhaas@postgresql.org 1500 : 1635 : CheckSlotRequirements(void)
1501 : : {
1502 : : /*
1503 : : * NB: Adding a new requirement likely means that RestoreSlotFromDisk()
1504 : : * needs the same check.
1505 : : */
1506 : :
1507 [ - + ]: 1635 : if (max_replication_slots == 0)
4236 rhaas@postgresql.org 1508 [ # # ]:UBC 0 : ereport(ERROR,
1509 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1510 : : errmsg("replication slots can only be used if \"max_replication_slots\" > 0")));
1511 : :
3477 peter_e@gmx.net 1512 [ - + ]:CBC 1635 : if (wal_level < WAL_LEVEL_REPLICA)
4236 rhaas@postgresql.org 1513 [ # # ]:UBC 0 : ereport(ERROR,
1514 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1515 : : errmsg("replication slots can only be used if \"wal_level\" >= \"replica\"")));
4236 rhaas@postgresql.org 1516 :CBC 1635 : }
1517 : :
1518 : : /*
1519 : : * Check whether the user has privilege to use replication slots.
1520 : : */
1521 : : void
1453 michael@paquier.xyz 1522 : 524 : CheckSlotPermissions(void)
1523 : : {
905 peter@eisentraut.org 1524 [ + + ]: 524 : if (!has_rolreplication(GetUserId()))
1453 michael@paquier.xyz 1525 [ + - ]: 5 : ereport(ERROR,
1526 : : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1527 : : errmsg("permission denied to use replication slots"),
1528 : : errdetail("Only roles with the %s attribute may use replication slots.",
1529 : : "REPLICATION")));
1530 : 519 : }
1531 : :
1532 : : /*
1533 : : * Reserve WAL for the currently active slot.
1534 : : *
1535 : : * Compute and set restart_lsn in a manner that's appropriate for the type of
1536 : : * the slot and concurrency safe.
1537 : : */
1538 : : void
3679 andres@anarazel.de 1539 : 569 : ReplicationSlotReserveWal(void)
1540 : : {
1541 : 569 : ReplicationSlot *slot = MyReplicationSlot;
1542 : :
1543 [ - + ]: 569 : Assert(slot != NULL);
1544 [ - + ]: 569 : Assert(slot->data.restart_lsn == InvalidXLogRecPtr);
84 akorotkov@postgresql 1545 [ + - ]: 569 : Assert(slot->last_saved_restart_lsn == InvalidXLogRecPtr);
1546 : :
1547 : : /*
1548 : : * The replication slot mechanism is used to prevent removal of required
1549 : : * WAL. As there is no interlock between this routine and checkpoints, WAL
1550 : : * segments could concurrently be removed when a now stale return value of
1551 : : * ReplicationSlotsComputeRequiredLSN() is used. In the unlikely case that
1552 : : * this happens we'll just retry.
1553 : : */
1554 : : while (true)
3679 andres@anarazel.de 1555 :UBC 0 : {
1556 : : XLogSegNo segno;
1557 : : XLogRecPtr restart_lsn;
1558 : :
1559 : : /*
1560 : : * For logical slots log a standby snapshot and start logical decoding
1561 : : * at exactly that position. That allows the slot to start up more
1562 : : * quickly. But on a standby we cannot do WAL writes, so just use the
1563 : : * replay pointer; effectively, an attempt to create a logical slot on
1564 : : * standby will cause it to wait for an xl_running_xact record to be
1565 : : * logged independently on the primary, so that a snapshot can be
1566 : : * built using the record.
1567 : : *
1568 : : * None of this is needed (or indeed helpful) for physical slots as
1569 : : * they'll start replay at the last logged checkpoint anyway. Instead
1570 : : * return the location of the last redo LSN. While that slightly
1571 : : * increases the chance that we have to retry, it's where a base
1572 : : * backup has to start replay at.
1573 : : */
882 andres@anarazel.de 1574 [ + + ]:CBC 569 : if (SlotIsPhysical(slot))
1575 : 145 : restart_lsn = GetRedoRecPtr();
1576 [ - + ]: 424 : else if (RecoveryInProgress())
882 andres@anarazel.de 1577 :UBC 0 : restart_lsn = GetXLogReplayRecPtr(NULL);
1578 : : else
2643 michael@paquier.xyz 1579 :CBC 424 : restart_lsn = GetXLogInsertRecPtr();
1580 : :
882 andres@anarazel.de 1581 [ - + ]: 569 : SpinLockAcquire(&slot->mutex);
1582 : 569 : slot->data.restart_lsn = restart_lsn;
1583 : 569 : SpinLockRelease(&slot->mutex);
1584 : :
1585 : : /* prevent WAL removal as fast as possible */
3679 1586 : 569 : ReplicationSlotsComputeRequiredLSN();
1587 : :
1588 : : /*
1589 : : * If all required WAL is still there, great, otherwise retry. The
1590 : : * slot should prevent further removal of WAL, unless there's a
1591 : : * concurrent ReplicationSlotsComputeRequiredLSN() after we've written
1592 : : * the new restart_lsn above, so normally we should never need to loop
1593 : : * more than twice.
1594 : : */
2909 1595 : 569 : XLByteToSeg(slot->data.restart_lsn, segno, wal_segment_size);
3679 1596 [ + - ]: 569 : if (XLogGetLastRemovedSegno() < segno)
1597 : 569 : break;
1598 : : }
1599 : :
882 1600 [ + + + + ]: 569 : if (!RecoveryInProgress() && SlotIsLogical(slot))
1601 : : {
1602 : : XLogRecPtr flushptr;
1603 : :
1604 : : /* make sure we have enough information to start */
1605 : 424 : flushptr = LogStandbySnapshot();
1606 : :
1607 : : /* and make sure it's fsynced to disk */
1608 : 424 : XLogFlush(flushptr);
1609 : : }
3679 1610 : 569 : }
1611 : :
1612 : : /*
1613 : : * Report that replication slot needs to be invalidated
1614 : : */
1615 : : static void
883 1616 : 6 : ReportSlotInvalidation(ReplicationSlotInvalidationCause cause,
1617 : : bool terminating,
1618 : : int pid,
1619 : : NameData slotname,
1620 : : XLogRecPtr restart_lsn,
1621 : : XLogRecPtr oldestLSN,
1622 : : TransactionId snapshotConflictHorizon,
1623 : : long slot_idle_seconds)
1624 : : {
1625 : : StringInfoData err_detail;
1626 : : StringInfoData err_hint;
1627 : :
1628 : 6 : initStringInfo(&err_detail);
199 akapila@postgresql.o 1629 : 6 : initStringInfo(&err_hint);
1630 : :
883 andres@anarazel.de 1631 [ + - - - : 6 : switch (cause)
- - ]
1632 : : {
1633 : 6 : case RS_INVAL_WAL_REMOVED:
1634 : : {
161 peter@eisentraut.org 1635 : 6 : uint64 ex = oldestLSN - restart_lsn;
1636 : :
744 1637 : 6 : appendStringInfo(&err_detail,
61 alvherre@kurilemu.de 1638 :GNC 6 : ngettext("The slot's restart_lsn %X/%08X exceeds the limit by %" PRIu64 " byte.",
1639 : : "The slot's restart_lsn %X/%08X exceeds the limit by %" PRIu64 " bytes.",
1640 : : ex),
744 peter@eisentraut.org 1641 :CBC 6 : LSN_FORMAT_ARGS(restart_lsn),
1642 : : ex);
1643 : : /* translator: %s is a GUC variable name */
199 akapila@postgresql.o 1644 : 6 : appendStringInfo(&err_hint, _("You might need to increase \"%s\"."),
1645 : : "max_slot_wal_keep_size");
744 peter@eisentraut.org 1646 : 6 : break;
1647 : : }
883 andres@anarazel.de 1648 :UBC 0 : case RS_INVAL_HORIZON:
1649 : 0 : appendStringInfo(&err_detail, _("The slot conflicted with xid horizon %u."),
1650 : : snapshotConflictHorizon);
1651 : 0 : break;
1652 : :
1653 : 0 : case RS_INVAL_WAL_LEVEL:
477 peter@eisentraut.org 1654 : 0 : appendStringInfoString(&err_detail, _("Logical decoding on standby requires \"wal_level\" >= \"logical\" on the primary server."));
883 andres@anarazel.de 1655 : 0 : break;
1656 : :
199 akapila@postgresql.o 1657 : 0 : case RS_INVAL_IDLE_TIMEOUT:
1658 : : {
1659 : : /* translator: %s is a GUC variable name */
57 fujii@postgresql.org 1660 : 0 : appendStringInfo(&err_detail, _("The slot's idle time of %lds exceeds the configured \"%s\" duration of %ds."),
1661 : : slot_idle_seconds, "idle_replication_slot_timeout",
1662 : : idle_replication_slot_timeout_secs);
1663 : : /* translator: %s is a GUC variable name */
199 akapila@postgresql.o 1664 : 0 : appendStringInfo(&err_hint, _("You might need to increase \"%s\"."),
1665 : : "idle_replication_slot_timeout");
1666 : 0 : break;
1667 : : }
883 andres@anarazel.de 1668 : 0 : case RS_INVAL_NONE:
1669 : 0 : pg_unreachable();
1670 : : }
1671 : :
883 andres@anarazel.de 1672 [ + - + + :CBC 6 : ereport(LOG,
+ - ]
1673 : : terminating ?
1674 : : errmsg("terminating process %d to release replication slot \"%s\"",
1675 : : pid, NameStr(slotname)) :
1676 : : errmsg("invalidating obsolete replication slot \"%s\"",
1677 : : NameStr(slotname)),
1678 : : errdetail_internal("%s", err_detail.data),
1679 : : err_hint.len ? errhint("%s", err_hint.data) : 0);
1680 : :
1681 : 6 : pfree(err_detail.data);
199 akapila@postgresql.o 1682 : 6 : pfree(err_hint.data);
1683 : 6 : }
1684 : :
1685 : : /*
1686 : : * Can we invalidate an idle replication slot?
1687 : : *
1688 : : * Idle timeout invalidation is allowed only when:
1689 : : *
1690 : : * 1. Idle timeout is set
1691 : : * 2. Slot has reserved WAL
1692 : : * 3. Slot is inactive
1693 : : * 4. The slot is not being synced from the primary while the server is in
1694 : : * recovery. This is because synced slots are always considered to be
1695 : : * inactive because they don't perform logical decoding to produce changes.
1696 : : */
1697 : : static inline bool
1698 : 349 : CanInvalidateIdleSlot(ReplicationSlot *s)
1699 : : {
57 fujii@postgresql.org 1700 : 349 : return (idle_replication_slot_timeout_secs != 0 &&
199 akapila@postgresql.o 1701 [ # # ]:UBC 0 : !XLogRecPtrIsInvalid(s->data.restart_lsn) &&
199 akapila@postgresql.o 1702 [ - + - - ]:CBC 349 : s->inactive_since > 0 &&
199 akapila@postgresql.o 1703 [ # # # # ]:UBC 0 : !(RecoveryInProgress() && s->data.synced));
1704 : : }
1705 : :
1706 : : /*
1707 : : * DetermineSlotInvalidationCause - Determine the cause for which a slot
1708 : : * becomes invalid among the given possible causes.
1709 : : *
1710 : : * This function sequentially checks all possible invalidation causes and
1711 : : * returns the first one for which the slot is eligible for invalidation.
1712 : : */
1713 : : static ReplicationSlotInvalidationCause
199 akapila@postgresql.o 1714 :CBC 355 : DetermineSlotInvalidationCause(uint32 possible_causes, ReplicationSlot *s,
1715 : : XLogRecPtr oldestLSN, Oid dboid,
1716 : : TransactionId snapshotConflictHorizon,
1717 : : TransactionId initial_effective_xmin,
1718 : : TransactionId initial_catalog_effective_xmin,
1719 : : XLogRecPtr initial_restart_lsn,
1720 : : TimestampTz *inactive_since, TimestampTz now)
1721 : : {
1722 [ - + ]: 355 : Assert(possible_causes != RS_INVAL_NONE);
1723 : :
1724 [ + - ]: 355 : if (possible_causes & RS_INVAL_WAL_REMOVED)
1725 : : {
1726 [ + + + + ]: 355 : if (initial_restart_lsn != InvalidXLogRecPtr &&
1727 : : initial_restart_lsn < oldestLSN)
1728 : 6 : return RS_INVAL_WAL_REMOVED;
1729 : : }
1730 : :
1731 [ - + ]: 349 : if (possible_causes & RS_INVAL_HORIZON)
1732 : : {
1733 : : /* invalid DB oid signals a shared relation */
199 akapila@postgresql.o 1734 [ # # # # ]:UBC 0 : if (SlotIsLogical(s) &&
1735 [ # # ]: 0 : (dboid == InvalidOid || dboid == s->data.database))
1736 : : {
1737 [ # # # # ]: 0 : if (TransactionIdIsValid(initial_effective_xmin) &&
1738 : 0 : TransactionIdPrecedesOrEquals(initial_effective_xmin,
1739 : : snapshotConflictHorizon))
1740 : 0 : return RS_INVAL_HORIZON;
1741 [ # # # # ]: 0 : else if (TransactionIdIsValid(initial_catalog_effective_xmin) &&
1742 : 0 : TransactionIdPrecedesOrEquals(initial_catalog_effective_xmin,
1743 : : snapshotConflictHorizon))
1744 : 0 : return RS_INVAL_HORIZON;
1745 : : }
1746 : : }
1747 : :
199 akapila@postgresql.o 1748 [ - + ]:CBC 349 : if (possible_causes & RS_INVAL_WAL_LEVEL)
1749 : : {
199 akapila@postgresql.o 1750 [ # # ]:UBC 0 : if (SlotIsLogical(s))
1751 : 0 : return RS_INVAL_WAL_LEVEL;
1752 : : }
1753 : :
199 akapila@postgresql.o 1754 [ + - ]:CBC 349 : if (possible_causes & RS_INVAL_IDLE_TIMEOUT)
1755 : : {
1756 [ - + ]: 349 : Assert(now > 0);
1757 : :
1758 [ - + ]: 349 : if (CanInvalidateIdleSlot(s))
1759 : : {
1760 : : /*
1761 : : * Simulate the invalidation due to idle_timeout to test the
1762 : : * timeout behavior promptly, without waiting for it to trigger
1763 : : * naturally.
1764 : : */
1765 : : #ifdef USE_INJECTION_POINTS
1766 : : if (IS_INJECTION_POINT_ATTACHED("slot-timeout-inval"))
1767 : : {
1768 : : *inactive_since = 0; /* since the beginning of time */
1769 : : return RS_INVAL_IDLE_TIMEOUT;
1770 : : }
1771 : : #endif
1772 : :
1773 : : /*
1774 : : * Check if the slot needs to be invalidated due to
1775 : : * idle_replication_slot_timeout GUC.
1776 : : */
199 akapila@postgresql.o 1777 [ # # ]:UBC 0 : if (TimestampDifferenceExceedsSeconds(s->inactive_since, now,
1778 : : idle_replication_slot_timeout_secs))
1779 : : {
1780 : 0 : *inactive_since = s->inactive_since;
1781 : 0 : return RS_INVAL_IDLE_TIMEOUT;
1782 : : }
1783 : : }
1784 : : }
1785 : :
199 akapila@postgresql.o 1786 :CBC 349 : return RS_INVAL_NONE;
1787 : : }
1788 : :
1789 : : /*
1790 : : * Helper for InvalidateObsoleteReplicationSlots
1791 : : *
1792 : : * Acquires the given slot and mark it invalid, if necessary and possible.
1793 : : *
1794 : : * Returns whether ReplicationSlotControlLock was released in the interim (and
1795 : : * in that case we're not holding the lock at return, otherwise we are).
1796 : : *
1797 : : * Sets *invalidated true if the slot was invalidated. (Untouched otherwise.)
1798 : : *
1799 : : * This is inherently racy, because we release the LWLock
1800 : : * for syscalls, so caller must restart if we return true.
1801 : : */
1802 : : static bool
1803 : 361 : InvalidatePossiblyObsoleteSlot(uint32 possible_causes,
1804 : : ReplicationSlot *s,
1805 : : XLogRecPtr oldestLSN,
1806 : : Oid dboid, TransactionId snapshotConflictHorizon,
1807 : : bool *invalidated)
1808 : : {
1548 alvherre@alvh.no-ip. 1809 : 361 : int last_signaled_pid = 0;
1810 : 361 : bool released_lock = false;
564 michael@paquier.xyz 1811 : 361 : bool terminated = false;
513 1812 : 361 : TransactionId initial_effective_xmin = InvalidTransactionId;
1813 : 361 : TransactionId initial_catalog_effective_xmin = InvalidTransactionId;
564 1814 : 361 : XLogRecPtr initial_restart_lsn = InvalidXLogRecPtr;
533 akapila@postgresql.o 1815 : 361 : ReplicationSlotInvalidationCause invalidation_cause_prev PG_USED_FOR_ASSERTS_ONLY = RS_INVAL_NONE;
199 1816 : 361 : TimestampTz inactive_since = 0;
1817 : :
1818 : : for (;;)
1978 alvherre@alvh.no-ip. 1819 : 2 : {
1820 : : XLogRecPtr restart_lsn;
1821 : : NameData slotname;
1548 1822 : 363 : int active_pid = 0;
533 akapila@postgresql.o 1823 : 363 : ReplicationSlotInvalidationCause invalidation_cause = RS_INVAL_NONE;
199 1824 : 363 : TimestampTz now = 0;
1825 : 363 : long slot_idle_secs = 0;
1826 : :
1548 alvherre@alvh.no-ip. 1827 [ - + ]: 363 : Assert(LWLockHeldByMeInMode(ReplicationSlotControlLock, LW_SHARED));
1828 : :
1978 1829 [ - + ]: 363 : if (!s->in_use)
1830 : : {
1548 alvherre@alvh.no-ip. 1831 [ # # ]:UBC 0 : if (released_lock)
1832 : 0 : LWLockRelease(ReplicationSlotControlLock);
1833 : 0 : break;
1834 : : }
1835 : :
199 akapila@postgresql.o 1836 [ + - ]:CBC 363 : if (possible_causes & RS_INVAL_IDLE_TIMEOUT)
1837 : : {
1838 : : /*
1839 : : * Assign the current time here to avoid system call overhead
1840 : : * while holding the spinlock in subsequent code.
1841 : : */
1842 : 363 : now = GetCurrentTimestamp();
1843 : : }
1844 : :
1845 : : /*
1846 : : * Check if the slot needs to be invalidated. If it needs to be
1847 : : * invalidated, and is not currently acquired, acquire it and mark it
1848 : : * as having been invalidated. We do this with the spinlock held to
1849 : : * avoid race conditions -- for example the restart_lsn could move
1850 : : * forward, or the slot could be dropped.
1851 : : */
1978 alvherre@alvh.no-ip. 1852 [ - + ]: 363 : SpinLockAcquire(&s->mutex);
1853 : :
1854 : 363 : restart_lsn = s->data.restart_lsn;
1855 : :
1856 : : /* we do nothing if the slot is already invalid */
883 andres@anarazel.de 1857 [ + + ]: 363 : if (s->data.invalidated == RS_INVAL_NONE)
1858 : : {
1859 : : /*
1860 : : * The slot's mutex will be released soon, and it is possible that
1861 : : * those values change since the process holding the slot has been
1862 : : * terminated (if any), so record them here to ensure that we
1863 : : * would report the correct invalidation cause.
1864 : : *
1865 : : * Unlike other slot attributes, slot's inactive_since can't be
1866 : : * changed until the acquired slot is released or the owning
1867 : : * process is terminated. So, the inactive slot can only be
1868 : : * invalidated immediately without being terminated.
1869 : : */
564 michael@paquier.xyz 1870 [ + + ]: 355 : if (!terminated)
1871 : : {
1872 : 353 : initial_restart_lsn = s->data.restart_lsn;
1873 : 353 : initial_effective_xmin = s->effective_xmin;
1874 : 353 : initial_catalog_effective_xmin = s->effective_catalog_xmin;
1875 : : }
1876 : :
199 akapila@postgresql.o 1877 : 355 : invalidation_cause = DetermineSlotInvalidationCause(possible_causes,
1878 : : s, oldestLSN,
1879 : : dboid,
1880 : : snapshotConflictHorizon,
1881 : : initial_effective_xmin,
1882 : : initial_catalog_effective_xmin,
1883 : : initial_restart_lsn,
1884 : : &inactive_since,
1885 : : now);
1886 : : }
1887 : :
1888 : : /*
1889 : : * The invalidation cause recorded previously should not change while
1890 : : * the process owning the slot (if any) has been terminated.
1891 : : */
533 1892 [ + + + - : 363 : Assert(!(invalidation_cause_prev != RS_INVAL_NONE && terminated &&
- + ]
1893 : : invalidation_cause_prev != invalidation_cause));
1894 : :
1895 : : /* if there's no invalidation, we're done */
1896 [ + + ]: 363 : if (invalidation_cause == RS_INVAL_NONE)
1897 : : {
1548 alvherre@alvh.no-ip. 1898 : 357 : SpinLockRelease(&s->mutex);
1899 [ - + ]: 357 : if (released_lock)
1548 alvherre@alvh.no-ip. 1900 :UBC 0 : LWLockRelease(ReplicationSlotControlLock);
1548 alvherre@alvh.no-ip. 1901 :CBC 357 : break;
1902 : : }
1903 : :
1904 : 6 : slotname = s->data.name;
1905 : 6 : active_pid = s->active_pid;
1906 : :
1907 : : /*
1908 : : * If the slot can be acquired, do so and mark it invalidated
1909 : : * immediately. Otherwise we'll signal the owning process, below, and
1910 : : * retry.
1911 : : */
1912 [ + + ]: 6 : if (active_pid == 0)
1913 : : {
1914 : 4 : MyReplicationSlot = s;
1915 : 4 : s->active_pid = MyProcPid;
533 akapila@postgresql.o 1916 : 4 : s->data.invalidated = invalidation_cause;
1917 : :
1918 : : /*
1919 : : * XXX: We should consider not overwriting restart_lsn and instead
1920 : : * just rely on .invalidated.
1921 : : */
1922 [ + - ]: 4 : if (invalidation_cause == RS_INVAL_WAL_REMOVED)
1923 : : {
883 andres@anarazel.de 1924 : 4 : s->data.restart_lsn = InvalidXLogRecPtr;
84 akorotkov@postgresql 1925 : 4 : s->last_saved_restart_lsn = InvalidXLogRecPtr;
1926 : : }
1927 : :
1928 : : /* Let caller know */
1513 alvherre@alvh.no-ip. 1929 : 4 : *invalidated = true;
1930 : : }
1931 : :
1548 1932 : 6 : SpinLockRelease(&s->mutex);
1933 : :
1934 : : /*
1935 : : * Calculate the idle time duration of the slot if slot is marked
1936 : : * invalidated with RS_INVAL_IDLE_TIMEOUT.
1937 : : */
199 akapila@postgresql.o 1938 [ - + ]: 6 : if (invalidation_cause == RS_INVAL_IDLE_TIMEOUT)
1939 : : {
1940 : : int slot_idle_usecs;
1941 : :
199 akapila@postgresql.o 1942 :UBC 0 : TimestampDifference(inactive_since, now, &slot_idle_secs,
1943 : : &slot_idle_usecs);
1944 : : }
1945 : :
1548 alvherre@alvh.no-ip. 1946 [ + + ]:CBC 6 : if (active_pid != 0)
1947 : : {
1948 : : /*
1949 : : * Prepare the sleep on the slot's condition variable before
1950 : : * releasing the lock, to close a possible race condition if the
1951 : : * slot is released before the sleep below.
1952 : : */
1953 : 2 : ConditionVariablePrepareToSleep(&s->active_cv);
1954 : :
1955 : 2 : LWLockRelease(ReplicationSlotControlLock);
1956 : 2 : released_lock = true;
1957 : :
1958 : : /*
1959 : : * Signal to terminate the process that owns the slot, if we
1960 : : * haven't already signalled it. (Avoidance of repeated
1961 : : * signalling is the only reason for there to be a loop in this
1962 : : * routine; otherwise we could rely on caller's restart loop.)
1963 : : *
1964 : : * There is the race condition that other process may own the slot
1965 : : * after its current owner process is terminated and before this
1966 : : * process owns it. To handle that, we signal only if the PID of
1967 : : * the owning process has changed from the previous time. (This
1968 : : * logic assumes that the same PID is not reused very quickly.)
1969 : : */
1970 [ + - ]: 2 : if (last_signaled_pid != active_pid)
1971 : : {
533 akapila@postgresql.o 1972 : 2 : ReportSlotInvalidation(invalidation_cause, true, active_pid,
1973 : : slotname, restart_lsn,
1974 : : oldestLSN, snapshotConflictHorizon,
1975 : : slot_idle_secs);
1976 : :
883 andres@anarazel.de 1977 [ - + ]: 2 : if (MyBackendType == B_STARTUP)
883 andres@anarazel.de 1978 :UBC 0 : (void) SendProcSignal(active_pid,
1979 : : PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT,
1980 : : INVALID_PROC_NUMBER);
1981 : : else
883 andres@anarazel.de 1982 :CBC 2 : (void) kill(active_pid, SIGTERM);
1983 : :
1548 alvherre@alvh.no-ip. 1984 : 2 : last_signaled_pid = active_pid;
564 michael@paquier.xyz 1985 : 2 : terminated = true;
533 akapila@postgresql.o 1986 : 2 : invalidation_cause_prev = invalidation_cause;
1987 : : }
1988 : :
1989 : : /* Wait until the slot is released. */
1548 alvherre@alvh.no-ip. 1990 : 2 : ConditionVariableSleep(&s->active_cv,
1991 : : WAIT_EVENT_REPLICATION_SLOT_DROP);
1992 : :
1993 : : /*
1994 : : * Re-acquire lock and start over; we expect to invalidate the
1995 : : * slot next time (unless another process acquires the slot in the
1996 : : * meantime).
1997 : : */
1998 : 2 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1999 : 2 : continue;
2000 : : }
2001 : : else
2002 : : {
2003 : : /*
2004 : : * We hold the slot now and have already invalidated it; flush it
2005 : : * to ensure that state persists.
2006 : : *
2007 : : * Don't want to hold ReplicationSlotControlLock across file
2008 : : * system operations, so release it now but be sure to tell caller
2009 : : * to restart from scratch.
2010 : : */
2011 : 4 : LWLockRelease(ReplicationSlotControlLock);
2012 : 4 : released_lock = true;
2013 : :
2014 : : /* Make sure the invalidated state persists across server restart */
2015 : 4 : ReplicationSlotMarkDirty();
2016 : 4 : ReplicationSlotSave();
2017 : 4 : ReplicationSlotRelease();
2018 : :
533 akapila@postgresql.o 2019 : 4 : ReportSlotInvalidation(invalidation_cause, false, active_pid,
2020 : : slotname, restart_lsn,
2021 : : oldestLSN, snapshotConflictHorizon,
2022 : : slot_idle_secs);
2023 : :
2024 : : /* done with this slot for now */
1548 alvherre@alvh.no-ip. 2025 : 4 : break;
2026 : : }
2027 : : }
2028 : :
2029 [ - + ]: 361 : Assert(released_lock == !LWLockHeldByMe(ReplicationSlotControlLock));
2030 : :
2031 : 361 : return released_lock;
2032 : : }
2033 : :
2034 : : /*
2035 : : * Invalidate slots that require resources about to be removed.
2036 : : *
2037 : : * Returns true when any slot have got invalidated.
2038 : : *
2039 : : * Whether a slot needs to be invalidated depends on the invalidation cause.
2040 : : * A slot is invalidated if it:
2041 : : * - RS_INVAL_WAL_REMOVED: requires a LSN older than the given segment
2042 : : * - RS_INVAL_HORIZON: requires a snapshot <= the given horizon in the given
2043 : : * db; dboid may be InvalidOid for shared relations
2044 : : * - RS_INVAL_WAL_LEVEL: is logical and wal_level is insufficient
2045 : : * - RS_INVAL_IDLE_TIMEOUT: has been idle longer than the configured
2046 : : * "idle_replication_slot_timeout" duration.
2047 : : *
2048 : : * Note: This function attempts to invalidate the slot for multiple possible
2049 : : * causes in a single pass, minimizing redundant iterations. The "cause"
2050 : : * parameter can be a MASK representing one or more of the defined causes.
2051 : : *
2052 : : * NB - this runs as part of checkpoint, so avoid raising errors if possible.
2053 : : */
2054 : : bool
199 akapila@postgresql.o 2055 : 1680 : InvalidateObsoleteReplicationSlots(uint32 possible_causes,
2056 : : XLogSegNo oldestSegno, Oid dboid,
2057 : : TransactionId snapshotConflictHorizon)
2058 : : {
2059 : : XLogRecPtr oldestLSN;
1513 alvherre@alvh.no-ip. 2060 : 1680 : bool invalidated = false;
2061 : :
199 akapila@postgresql.o 2062 [ - + - - ]: 1680 : Assert(!(possible_causes & RS_INVAL_HORIZON) || TransactionIdIsValid(snapshotConflictHorizon));
2063 [ + + - + ]: 1680 : Assert(!(possible_causes & RS_INVAL_WAL_REMOVED) || oldestSegno > 0);
2064 [ - + ]: 1680 : Assert(possible_causes != RS_INVAL_NONE);
2065 : :
883 andres@anarazel.de 2066 [ + + ]: 1680 : if (max_replication_slots == 0)
883 andres@anarazel.de 2067 :GBC 1 : return invalidated;
2068 : :
1548 alvherre@alvh.no-ip. 2069 :CBC 1679 : XLogSegNoOffsetToRecPtr(oldestSegno, 0, wal_segment_size, oldestLSN);
2070 : :
2071 : 1683 : restart:
2072 : 1683 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
2073 [ + + ]: 18229 : for (int i = 0; i < max_replication_slots; i++)
2074 : : {
2075 : 16550 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
2076 : :
2077 [ + + ]: 16550 : if (!s->in_use)
2078 : 16182 : continue;
2079 : :
2080 : : /* Prevent invalidation of logical slots during binary upgrade */
57 akapila@postgresql.o 2081 [ + + + + ]: 368 : if (SlotIsLogical(s) && IsBinaryUpgrade)
2082 : 7 : continue;
2083 : :
199 2084 [ + + ]: 361 : if (InvalidatePossiblyObsoleteSlot(possible_causes, s, oldestLSN, dboid,
2085 : : snapshotConflictHorizon,
2086 : : &invalidated))
2087 : : {
2088 : : /* if the lock was released, start from scratch */
1548 alvherre@alvh.no-ip. 2089 : 4 : goto restart;
2090 : : }
2091 : : }
1978 2092 : 1679 : LWLockRelease(ReplicationSlotControlLock);
2093 : :
2094 : : /*
2095 : : * If any slots have been invalidated, recalculate the resource limits.
2096 : : */
1513 2097 [ + + ]: 1679 : if (invalidated)
2098 : : {
2099 : 4 : ReplicationSlotsComputeRequiredXmin(false);
2100 : 4 : ReplicationSlotsComputeRequiredLSN();
2101 : : }
2102 : :
2103 : 1679 : return invalidated;
2104 : : }
2105 : :
2106 : : /*
2107 : : * Flush all replication slots to disk.
2108 : : *
2109 : : * It is convenient to flush dirty replication slots at the time of checkpoint.
2110 : : * Additionally, in case of a shutdown checkpoint, we also identify the slots
2111 : : * for which the confirmed_flush LSN has been updated since the last time it
2112 : : * was saved and flush them.
2113 : : */
2114 : : void
723 akapila@postgresql.o 2115 : 1677 : CheckPointReplicationSlots(bool is_shutdown)
2116 : : {
2117 : : int i;
71 akorotkov@postgresql 2118 : 1677 : bool last_saved_restart_lsn_updated = false;
2119 : :
3952 peter_e@gmx.net 2120 [ + + ]: 1677 : elog(DEBUG1, "performing replication slot checkpoint");
2121 : :
2122 : : /*
2123 : : * Prevent any slot from being created/dropped while we're active. As we
2124 : : * explicitly do *not* want to block iterating over replication_slots or
2125 : : * acquiring a slot we cannot take the control lock - but that's OK,
2126 : : * because holding ReplicationSlotAllocationLock is strictly stronger, and
2127 : : * enough to guarantee that nobody can change the in_use bits on us.
2128 : : */
4236 rhaas@postgresql.org 2129 : 1677 : LWLockAcquire(ReplicationSlotAllocationLock, LW_SHARED);
2130 : :
2131 [ + + ]: 18193 : for (i = 0; i < max_replication_slots; i++)
2132 : : {
2133 : 16516 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
2134 : : char path[MAXPGPATH];
2135 : :
2136 [ + + ]: 16516 : if (!s->in_use)
2137 : 16152 : continue;
2138 : :
2139 : : /* save the slot to disk, locking is handled in SaveSlotToPath() */
372 michael@paquier.xyz 2140 : 364 : sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(s->data.name));
2141 : :
2142 : : /*
2143 : : * Slot's data is not flushed each time the confirmed_flush LSN is
2144 : : * updated as that could lead to frequent writes. However, we decide
2145 : : * to force a flush of all logical slot's data at the time of shutdown
2146 : : * if the confirmed_flush LSN is changed since we last flushed it to
2147 : : * disk. This helps in avoiding an unnecessary retreat of the
2148 : : * confirmed_flush LSN after restart.
2149 : : */
723 akapila@postgresql.o 2150 [ + + + + ]: 364 : if (is_shutdown && SlotIsLogical(s))
2151 : : {
2152 [ - + ]: 77 : SpinLockAcquire(&s->mutex);
2153 : :
2154 [ + - ]: 77 : if (s->data.invalidated == RS_INVAL_NONE &&
452 2155 [ + + ]: 77 : s->data.confirmed_flush > s->last_saved_confirmed_flush)
2156 : : {
723 2157 : 35 : s->just_dirtied = true;
2158 : 35 : s->dirty = true;
2159 : : }
2160 : 77 : SpinLockRelease(&s->mutex);
2161 : : }
2162 : :
2163 : : /*
2164 : : * Track if we're going to update slot's last_saved_restart_lsn. We
2165 : : * need this to know if we need to recompute the required LSN.
2166 : : */
71 akorotkov@postgresql 2167 [ + + ]: 364 : if (s->last_saved_restart_lsn != s->data.restart_lsn)
2168 : 198 : last_saved_restart_lsn_updated = true;
2169 : :
4236 rhaas@postgresql.org 2170 : 364 : SaveSlotToPath(s, path, LOG);
2171 : : }
2172 : 1677 : LWLockRelease(ReplicationSlotAllocationLock);
2173 : :
2174 : : /*
2175 : : * Recompute the required LSN if SaveSlotToPath() updated
2176 : : * last_saved_restart_lsn for any slot.
2177 : : */
71 akorotkov@postgresql 2178 [ + + ]: 1677 : if (last_saved_restart_lsn_updated)
2179 : 198 : ReplicationSlotsComputeRequiredLSN();
4236 rhaas@postgresql.org 2180 : 1677 : }
2181 : :
2182 : : /*
2183 : : * Load all replication slots from disk into memory at server startup. This
2184 : : * needs to be run before we start crash recovery.
2185 : : */
2186 : : void
4104 andres@anarazel.de 2187 : 887 : StartupReplicationSlots(void)
2188 : : {
2189 : : DIR *replication_dir;
2190 : : struct dirent *replication_de;
2191 : :
3952 peter_e@gmx.net 2192 [ + + ]: 887 : elog(DEBUG1, "starting up replication slots");
2193 : :
2194 : : /* restore all slots by iterating over all on-disk entries */
372 michael@paquier.xyz 2195 : 887 : replication_dir = AllocateDir(PG_REPLSLOT_DIR);
2196 [ + + ]: 2757 : while ((replication_de = ReadDir(replication_dir, PG_REPLSLOT_DIR)) != NULL)
2197 : : {
2198 : : char path[MAXPGPATH + sizeof(PG_REPLSLOT_DIR)];
2199 : : PGFileType de_type;
2200 : :
4236 rhaas@postgresql.org 2201 [ + + ]: 1870 : if (strcmp(replication_de->d_name, ".") == 0 ||
2202 [ + + ]: 983 : strcmp(replication_de->d_name, "..") == 0)
2203 : 1774 : continue;
2204 : :
372 michael@paquier.xyz 2205 : 96 : snprintf(path, sizeof(path), "%s/%s", PG_REPLSLOT_DIR, replication_de->d_name);
1100 2206 : 96 : de_type = get_dirent_type(path, replication_de, false, DEBUG1);
2207 : :
2208 : : /* we're only creating directories here, skip if it's not our's */
2209 [ + - - + ]: 96 : if (de_type != PGFILETYPE_ERROR && de_type != PGFILETYPE_DIR)
4236 rhaas@postgresql.org 2210 :UBC 0 : continue;
2211 : :
2212 : : /* we crashed while a slot was being setup or deleted, clean up */
3899 andres@anarazel.de 2213 [ - + ]:CBC 96 : if (pg_str_endswith(replication_de->d_name, ".tmp"))
2214 : : {
4236 rhaas@postgresql.org 2215 [ # # ]:UBC 0 : if (!rmtree(path, true))
2216 : : {
2217 [ # # ]: 0 : ereport(WARNING,
2218 : : (errmsg("could not remove directory \"%s\"",
2219 : : path)));
2220 : 0 : continue;
2221 : : }
372 michael@paquier.xyz 2222 : 0 : fsync_fname(PG_REPLSLOT_DIR, true);
4236 rhaas@postgresql.org 2223 : 0 : continue;
2224 : : }
2225 : :
2226 : : /* looks like a slot in a normal state, restore */
4236 rhaas@postgresql.org 2227 :CBC 96 : RestoreSlotFromDisk(replication_de->d_name);
2228 : : }
2229 : 887 : FreeDir(replication_dir);
2230 : :
2231 : : /* currently no slots exist, we're done. */
2232 [ + + ]: 887 : if (max_replication_slots <= 0)
4236 rhaas@postgresql.org 2233 :GBC 1 : return;
2234 : :
2235 : : /* Now that we have recovered all the data, compute replication xmin */
4205 rhaas@postgresql.org 2236 :CBC 886 : ReplicationSlotsComputeRequiredXmin(false);
4236 2237 : 886 : ReplicationSlotsComputeRequiredLSN();
2238 : : }
2239 : :
2240 : : /* ----
2241 : : * Manipulation of on-disk state of replication slots
2242 : : *
2243 : : * NB: none of the routines below should take any notice whether a slot is the
2244 : : * current one or not, that's all handled a layer above.
2245 : : * ----
2246 : : */
2247 : : static void
2248 : 611 : CreateSlotOnDisk(ReplicationSlot *slot)
2249 : : {
2250 : : char tmppath[MAXPGPATH];
2251 : : char path[MAXPGPATH];
2252 : : struct stat st;
2253 : :
2254 : : /*
2255 : : * No need to take out the io_in_progress_lock, nobody else can see this
2256 : : * slot yet, so nobody else will write. We're reusing SaveSlotToPath which
2257 : : * takes out the lock, if we'd take the lock here, we'd deadlock.
2258 : : */
2259 : :
372 michael@paquier.xyz 2260 : 611 : sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(slot->data.name));
2261 : 611 : sprintf(tmppath, "%s/%s.tmp", PG_REPLSLOT_DIR, NameStr(slot->data.name));
2262 : :
2263 : : /*
2264 : : * It's just barely possible that some previous effort to create or drop a
2265 : : * slot with this name left a temp directory lying around. If that seems
2266 : : * to be the case, try to remove it. If the rmtree() fails, we'll error
2267 : : * out at the MakePGDirectory() below, so we don't bother checking
2268 : : * success.
2269 : : */
4236 rhaas@postgresql.org 2270 [ - + - - ]: 611 : if (stat(tmppath, &st) == 0 && S_ISDIR(st.st_mode))
4236 rhaas@postgresql.org 2271 :UBC 0 : rmtree(tmppath, true);
2272 : :
2273 : : /* Create and fsync the temporary slot directory. */
2709 sfrost@snowman.net 2274 [ - + ]:CBC 611 : if (MakePGDirectory(tmppath) < 0)
4236 rhaas@postgresql.org 2275 [ # # ]:UBC 0 : ereport(ERROR,
2276 : : (errcode_for_file_access(),
2277 : : errmsg("could not create directory \"%s\": %m",
2278 : : tmppath)));
4236 rhaas@postgresql.org 2279 :CBC 611 : fsync_fname(tmppath, true);
2280 : :
2281 : : /* Write the actual state file. */
4141 bruce@momjian.us 2282 : 611 : slot->dirty = true; /* signal that we really need to write */
4236 rhaas@postgresql.org 2283 : 611 : SaveSlotToPath(slot, tmppath, ERROR);
2284 : :
2285 : : /* Rename the directory into place. */
2286 [ - + ]: 611 : if (rename(tmppath, path) != 0)
4236 rhaas@postgresql.org 2287 [ # # ]:UBC 0 : ereport(ERROR,
2288 : : (errcode_for_file_access(),
2289 : : errmsg("could not rename file \"%s\" to \"%s\": %m",
2290 : : tmppath, path)));
2291 : :
2292 : : /*
2293 : : * If we'd now fail - really unlikely - we wouldn't know whether this slot
2294 : : * would persist after an OS crash or not - so, force a restart. The
2295 : : * restart would try to fsync this again till it works.
2296 : : */
4236 rhaas@postgresql.org 2297 :CBC 611 : START_CRIT_SECTION();
2298 : :
2299 : 611 : fsync_fname(path, true);
372 michael@paquier.xyz 2300 : 611 : fsync_fname(PG_REPLSLOT_DIR, true);
2301 : :
4236 rhaas@postgresql.org 2302 [ - + ]: 611 : END_CRIT_SECTION();
2303 : 611 : }
2304 : :
2305 : : /*
2306 : : * Shared functionality between saving and creating a replication slot.
2307 : : */
2308 : : static void
2309 : 2251 : SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
2310 : : {
2311 : : char tmppath[MAXPGPATH];
2312 : : char path[MAXPGPATH];
2313 : : int fd;
2314 : : ReplicationSlotOnDisk cp;
2315 : : bool was_dirty;
2316 : :
2317 : : /* first check whether there's something to write out */
3623 2318 [ - + ]: 2251 : SpinLockAcquire(&slot->mutex);
2319 : 2251 : was_dirty = slot->dirty;
2320 : 2251 : slot->just_dirtied = false;
2321 : 2251 : SpinLockRelease(&slot->mutex);
2322 : :
2323 : : /* and don't do anything if there's nothing to write */
4236 2324 [ + + ]: 2251 : if (!was_dirty)
2325 : 121 : return;
2326 : :
3508 2327 : 2130 : LWLockAcquire(&slot->io_in_progress_lock, LW_EXCLUSIVE);
2328 : :
2329 : : /* silence valgrind :( */
4236 2330 : 2130 : memset(&cp, 0, sizeof(ReplicationSlotOnDisk));
2331 : :
2332 : 2130 : sprintf(tmppath, "%s/state.tmp", dir);
2333 : 2130 : sprintf(path, "%s/state", dir);
2334 : :
2905 peter_e@gmx.net 2335 : 2130 : fd = OpenTransientFile(tmppath, O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
4236 rhaas@postgresql.org 2336 [ - + ]: 2130 : if (fd < 0)
2337 : : {
2338 : : /*
2339 : : * If not an ERROR, then release the lock before returning. In case
2340 : : * of an ERROR, the error recovery path automatically releases the
2341 : : * lock, but no harm in explicitly releasing even in that case. Note
2342 : : * that LWLockRelease() could affect errno.
2343 : : */
1980 peter@eisentraut.org 2344 :UBC 0 : int save_errno = errno;
2345 : :
1990 2346 : 0 : LWLockRelease(&slot->io_in_progress_lock);
1980 2347 : 0 : errno = save_errno;
4236 rhaas@postgresql.org 2348 [ # # ]: 0 : ereport(elevel,
2349 : : (errcode_for_file_access(),
2350 : : errmsg("could not create file \"%s\": %m",
2351 : : tmppath)));
2352 : 0 : return;
2353 : : }
2354 : :
4236 rhaas@postgresql.org 2355 :CBC 2130 : cp.magic = SLOT_MAGIC;
3959 heikki.linnakangas@i 2356 : 2130 : INIT_CRC32C(cp.checksum);
3951 andres@anarazel.de 2357 : 2130 : cp.version = SLOT_VERSION;
2358 : 2130 : cp.length = ReplicationSlotOnDiskV2Size;
2359 : :
4236 rhaas@postgresql.org 2360 [ - + ]: 2130 : SpinLockAcquire(&slot->mutex);
2361 : :
2362 : 2130 : memcpy(&cp.slotdata, &slot->data, sizeof(ReplicationSlotPersistentData));
2363 : :
2364 : 2130 : SpinLockRelease(&slot->mutex);
2365 : :
3959 heikki.linnakangas@i 2366 : 2130 : COMP_CRC32C(cp.checksum,
2367 : : (char *) (&cp) + ReplicationSlotOnDiskNotChecksummedSize,
2368 : : ReplicationSlotOnDiskChecksummedSize);
3951 andres@anarazel.de 2369 : 2130 : FIN_CRC32C(cp.checksum);
2370 : :
2589 michael@paquier.xyz 2371 : 2130 : errno = 0;
3094 rhaas@postgresql.org 2372 : 2130 : pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_WRITE);
4236 2373 [ - + ]: 2130 : if ((write(fd, &cp, sizeof(cp))) != sizeof(cp))
2374 : : {
4141 bruce@momjian.us 2375 :UBC 0 : int save_errno = errno;
2376 : :
3094 rhaas@postgresql.org 2377 : 0 : pgstat_report_wait_end();
4236 2378 : 0 : CloseTransientFile(fd);
1990 peter@eisentraut.org 2379 : 0 : LWLockRelease(&slot->io_in_progress_lock);
2380 : :
2381 : : /* if write didn't set errno, assume problem is no disk space */
2630 michael@paquier.xyz 2382 [ # # ]: 0 : errno = save_errno ? save_errno : ENOSPC;
4236 rhaas@postgresql.org 2383 [ # # ]: 0 : ereport(elevel,
2384 : : (errcode_for_file_access(),
2385 : : errmsg("could not write to file \"%s\": %m",
2386 : : tmppath)));
2387 : 0 : return;
2388 : : }
3094 rhaas@postgresql.org 2389 :CBC 2130 : pgstat_report_wait_end();
2390 : :
2391 : : /* fsync the temporary file */
2392 : 2130 : pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_SYNC);
4236 2393 [ - + ]: 2130 : if (pg_fsync(fd) != 0)
2394 : : {
4141 bruce@momjian.us 2395 :UBC 0 : int save_errno = errno;
2396 : :
3094 rhaas@postgresql.org 2397 : 0 : pgstat_report_wait_end();
4236 2398 : 0 : CloseTransientFile(fd);
1990 peter@eisentraut.org 2399 : 0 : LWLockRelease(&slot->io_in_progress_lock);
4236 rhaas@postgresql.org 2400 : 0 : errno = save_errno;
2401 [ # # ]: 0 : ereport(elevel,
2402 : : (errcode_for_file_access(),
2403 : : errmsg("could not fsync file \"%s\": %m",
2404 : : tmppath)));
2405 : 0 : return;
2406 : : }
3094 rhaas@postgresql.org 2407 :CBC 2130 : pgstat_report_wait_end();
2408 : :
2254 peter@eisentraut.org 2409 [ - + ]: 2130 : if (CloseTransientFile(fd) != 0)
2410 : : {
1980 peter@eisentraut.org 2411 :UBC 0 : int save_errno = errno;
2412 : :
1990 2413 : 0 : LWLockRelease(&slot->io_in_progress_lock);
1980 2414 : 0 : errno = save_errno;
2373 michael@paquier.xyz 2415 [ # # ]: 0 : ereport(elevel,
2416 : : (errcode_for_file_access(),
2417 : : errmsg("could not close file \"%s\": %m",
2418 : : tmppath)));
2334 2419 : 0 : return;
2420 : : }
2421 : :
2422 : : /* rename to permanent file, fsync file and directory */
4236 rhaas@postgresql.org 2423 [ - + ]:CBC 2130 : if (rename(tmppath, path) != 0)
2424 : : {
1980 peter@eisentraut.org 2425 :UBC 0 : int save_errno = errno;
2426 : :
1990 2427 : 0 : LWLockRelease(&slot->io_in_progress_lock);
1980 2428 : 0 : errno = save_errno;
4236 rhaas@postgresql.org 2429 [ # # ]: 0 : ereport(elevel,
2430 : : (errcode_for_file_access(),
2431 : : errmsg("could not rename file \"%s\" to \"%s\": %m",
2432 : : tmppath, path)));
2433 : 0 : return;
2434 : : }
2435 : :
2436 : : /*
2437 : : * Check CreateSlotOnDisk() for the reasoning of using a critical section.
2438 : : */
4236 rhaas@postgresql.org 2439 :CBC 2130 : START_CRIT_SECTION();
2440 : :
2441 : 2130 : fsync_fname(path, false);
3468 andres@anarazel.de 2442 : 2130 : fsync_fname(dir, true);
372 michael@paquier.xyz 2443 : 2130 : fsync_fname(PG_REPLSLOT_DIR, true);
2444 : :
4236 rhaas@postgresql.org 2445 [ - + ]: 2130 : END_CRIT_SECTION();
2446 : :
2447 : : /*
2448 : : * Successfully wrote, unset dirty bit, unless somebody dirtied again
2449 : : * already and remember the confirmed_flush LSN value.
2450 : : */
3623 2451 [ - + ]: 2130 : SpinLockAcquire(&slot->mutex);
2452 [ + + ]: 2130 : if (!slot->just_dirtied)
2453 : 2122 : slot->dirty = false;
723 akapila@postgresql.o 2454 : 2130 : slot->last_saved_confirmed_flush = cp.slotdata.confirmed_flush;
84 akorotkov@postgresql 2455 : 2130 : slot->last_saved_restart_lsn = cp.slotdata.restart_lsn;
3623 rhaas@postgresql.org 2456 : 2130 : SpinLockRelease(&slot->mutex);
2457 : :
3508 2458 : 2130 : LWLockRelease(&slot->io_in_progress_lock);
2459 : : }
2460 : :
2461 : : /*
2462 : : * Load a single slot from disk into memory.
2463 : : */
2464 : : static void
4236 2465 : 96 : RestoreSlotFromDisk(const char *name)
2466 : : {
2467 : : ReplicationSlotOnDisk cp;
2468 : : int i;
2469 : : char slotdir[MAXPGPATH + sizeof(PG_REPLSLOT_DIR)];
2470 : : char path[MAXPGPATH + sizeof(PG_REPLSLOT_DIR) + 10];
2471 : : int fd;
2472 : 96 : bool restored = false;
2473 : : int readBytes;
2474 : : pg_crc32c checksum;
213 akapila@postgresql.o 2475 : 96 : TimestampTz now = 0;
2476 : :
2477 : : /* no need to lock here, no concurrent access allowed yet */
2478 : :
2479 : : /* delete temp file if it exists */
372 michael@paquier.xyz 2480 : 96 : sprintf(slotdir, "%s/%s", PG_REPLSLOT_DIR, name);
2561 2481 : 96 : sprintf(path, "%s/state.tmp", slotdir);
4236 rhaas@postgresql.org 2482 [ + - - + ]: 96 : if (unlink(path) < 0 && errno != ENOENT)
4236 rhaas@postgresql.org 2483 [ # # ]:UBC 0 : ereport(PANIC,
2484 : : (errcode_for_file_access(),
2485 : : errmsg("could not remove file \"%s\": %m", path)));
2486 : :
2561 michael@paquier.xyz 2487 :CBC 96 : sprintf(path, "%s/state", slotdir);
2488 : :
4236 rhaas@postgresql.org 2489 [ + + ]: 96 : elog(DEBUG1, "restoring replication slot from \"%s\"", path);
2490 : :
2491 : : /* on some operating systems fsyncing a file requires O_RDWR */
2164 andres@anarazel.de 2492 : 96 : fd = OpenTransientFile(path, O_RDWR | PG_BINARY);
2493 : :
2494 : : /*
2495 : : * We do not need to handle this as we are rename()ing the directory into
2496 : : * place only after we fsync()ed the state file.
2497 : : */
4236 rhaas@postgresql.org 2498 [ - + ]: 96 : if (fd < 0)
4236 rhaas@postgresql.org 2499 [ # # ]:UBC 0 : ereport(PANIC,
2500 : : (errcode_for_file_access(),
2501 : : errmsg("could not open file \"%s\": %m", path)));
2502 : :
2503 : : /*
2504 : : * Sync state file before we're reading from it. We might have crashed
2505 : : * while it wasn't synced yet and we shouldn't continue on that basis.
2506 : : */
3094 rhaas@postgresql.org 2507 :CBC 96 : pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_RESTORE_SYNC);
4236 2508 [ - + ]: 96 : if (pg_fsync(fd) != 0)
4236 rhaas@postgresql.org 2509 [ # # ]:UBC 0 : ereport(PANIC,
2510 : : (errcode_for_file_access(),
2511 : : errmsg("could not fsync file \"%s\": %m",
2512 : : path)));
3094 rhaas@postgresql.org 2513 :CBC 96 : pgstat_report_wait_end();
2514 : :
2515 : : /* Also sync the parent directory */
4236 2516 : 96 : START_CRIT_SECTION();
2561 michael@paquier.xyz 2517 : 96 : fsync_fname(slotdir, true);
4236 rhaas@postgresql.org 2518 [ - + ]: 96 : END_CRIT_SECTION();
2519 : :
2520 : : /* read part of statefile that's guaranteed to be version independent */
3094 2521 : 96 : pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_READ);
4236 2522 : 96 : readBytes = read(fd, &cp, ReplicationSlotOnDiskConstantSize);
3094 2523 : 96 : pgstat_report_wait_end();
4236 2524 [ - + ]: 96 : if (readBytes != ReplicationSlotOnDiskConstantSize)
2525 : : {
2607 michael@paquier.xyz 2526 [ # # ]:UBC 0 : if (readBytes < 0)
2527 [ # # ]: 0 : ereport(PANIC,
2528 : : (errcode_for_file_access(),
2529 : : errmsg("could not read file \"%s\": %m", path)));
2530 : : else
2531 [ # # ]: 0 : ereport(PANIC,
2532 : : (errcode(ERRCODE_DATA_CORRUPTED),
2533 : : errmsg("could not read file \"%s\": read %d of %zu",
2534 : : path, readBytes,
2535 : : (Size) ReplicationSlotOnDiskConstantSize)));
2536 : : }
2537 : :
2538 : : /* verify magic */
4236 rhaas@postgresql.org 2539 [ - + ]:CBC 96 : if (cp.magic != SLOT_MAGIC)
4236 rhaas@postgresql.org 2540 [ # # ]:UBC 0 : ereport(PANIC,
2541 : : (errcode(ERRCODE_DATA_CORRUPTED),
2542 : : errmsg("replication slot file \"%s\" has wrong magic number: %u instead of %u",
2543 : : path, cp.magic, SLOT_MAGIC)));
2544 : :
2545 : : /* verify version */
4236 rhaas@postgresql.org 2546 [ - + ]:CBC 96 : if (cp.version != SLOT_VERSION)
4236 rhaas@postgresql.org 2547 [ # # ]:UBC 0 : ereport(PANIC,
2548 : : (errcode(ERRCODE_DATA_CORRUPTED),
2549 : : errmsg("replication slot file \"%s\" has unsupported version %u",
2550 : : path, cp.version)));
2551 : :
2552 : : /* boundary check on length */
3951 andres@anarazel.de 2553 [ - + ]:CBC 96 : if (cp.length != ReplicationSlotOnDiskV2Size)
4236 rhaas@postgresql.org 2554 [ # # ]:UBC 0 : ereport(PANIC,
2555 : : (errcode(ERRCODE_DATA_CORRUPTED),
2556 : : errmsg("replication slot file \"%s\" has corrupted length %u",
2557 : : path, cp.length)));
2558 : :
2559 : : /* Now that we know the size, read the entire file */
3094 rhaas@postgresql.org 2560 :CBC 96 : pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_READ);
4236 2561 : 192 : readBytes = read(fd,
2562 : : (char *) &cp + ReplicationSlotOnDiskConstantSize,
2563 : 96 : cp.length);
3094 2564 : 96 : pgstat_report_wait_end();
4236 2565 [ - + ]: 96 : if (readBytes != cp.length)
2566 : : {
2607 michael@paquier.xyz 2567 [ # # ]:UBC 0 : if (readBytes < 0)
2568 [ # # ]: 0 : ereport(PANIC,
2569 : : (errcode_for_file_access(),
2570 : : errmsg("could not read file \"%s\": %m", path)));
2571 : : else
2572 [ # # ]: 0 : ereport(PANIC,
2573 : : (errcode(ERRCODE_DATA_CORRUPTED),
2574 : : errmsg("could not read file \"%s\": read %d of %zu",
2575 : : path, readBytes, (Size) cp.length)));
2576 : : }
2577 : :
2254 peter@eisentraut.org 2578 [ - + ]:CBC 96 : if (CloseTransientFile(fd) != 0)
2373 michael@paquier.xyz 2579 [ # # ]:UBC 0 : ereport(PANIC,
2580 : : (errcode_for_file_access(),
2581 : : errmsg("could not close file \"%s\": %m", path)));
2582 : :
2583 : : /* now verify the CRC */
3959 heikki.linnakangas@i 2584 :CBC 96 : INIT_CRC32C(checksum);
2585 : 96 : COMP_CRC32C(checksum,
2586 : : (char *) &cp + ReplicationSlotOnDiskNotChecksummedSize,
2587 : : ReplicationSlotOnDiskChecksummedSize);
3951 andres@anarazel.de 2588 : 96 : FIN_CRC32C(checksum);
2589 : :
3959 heikki.linnakangas@i 2590 [ - + ]: 96 : if (!EQ_CRC32C(checksum, cp.checksum))
4236 rhaas@postgresql.org 2591 [ # # ]:UBC 0 : ereport(PANIC,
2592 : : (errmsg("checksum mismatch for replication slot file \"%s\": is %u, should be %u",
2593 : : path, checksum, cp.checksum)));
2594 : :
2595 : : /*
2596 : : * If we crashed with an ephemeral slot active, don't restore but delete
2597 : : * it.
2598 : : */
4062 andres@anarazel.de 2599 [ - + ]:CBC 96 : if (cp.slotdata.persistency != RS_PERSISTENT)
2600 : : {
2561 michael@paquier.xyz 2601 [ # # ]:UBC 0 : if (!rmtree(slotdir, true))
2602 : : {
4062 andres@anarazel.de 2603 [ # # ]: 0 : ereport(WARNING,
2604 : : (errmsg("could not remove directory \"%s\"",
2605 : : slotdir)));
2606 : : }
372 michael@paquier.xyz 2607 : 0 : fsync_fname(PG_REPLSLOT_DIR, true);
4062 andres@anarazel.de 2608 : 0 : return;
2609 : : }
2610 : :
2611 : : /*
2612 : : * Verify that requirements for the specific slot type are met. That's
2613 : : * important because if these aren't met we're not guaranteed to retain
2614 : : * all the necessary resources for the slot.
2615 : : *
2616 : : * NB: We have to do so *after* the above checks for ephemeral slots,
2617 : : * because otherwise a slot that shouldn't exist anymore could prevent
2618 : : * restarts.
2619 : : *
2620 : : * NB: Changing the requirements here also requires adapting
2621 : : * CheckSlotRequirements() and CheckLogicalDecodingRequirements().
2622 : : */
194 msawada@postgresql.o 2623 [ + + ]:CBC 96 : if (cp.slotdata.database != InvalidOid)
2624 : : {
2625 [ - + ]: 65 : if (wal_level < WAL_LEVEL_LOGICAL)
194 msawada@postgresql.o 2626 [ # # ]:UBC 0 : ereport(FATAL,
2627 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2628 : : errmsg("logical replication slot \"%s\" exists, but \"wal_level\" < \"logical\"",
2629 : : NameStr(cp.slotdata.name)),
2630 : : errhint("Change \"wal_level\" to be \"logical\" or higher.")));
2631 : :
2632 : : /*
2633 : : * In standby mode, the hot standby must be enabled. This check is
2634 : : * necessary to ensure logical slots are invalidated when they become
2635 : : * incompatible due to insufficient wal_level. Otherwise, if the
2636 : : * primary reduces wal_level < logical while hot standby is disabled,
2637 : : * logical slots would remain valid even after promotion.
2638 : : */
194 msawada@postgresql.o 2639 [ + + - + ]:CBC 65 : if (StandbyMode && !EnableHotStandby)
194 msawada@postgresql.o 2640 [ # # ]:UBC 0 : ereport(FATAL,
2641 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2642 : : errmsg("logical replication slot \"%s\" exists on the standby, but \"hot_standby\" = \"off\"",
2643 : : NameStr(cp.slotdata.name)),
2644 : : errhint("Change \"hot_standby\" to be \"on\".")));
2645 : : }
2502 andres@anarazel.de 2646 [ - + ]:CBC 31 : else if (wal_level < WAL_LEVEL_REPLICA)
2502 andres@anarazel.de 2647 [ # # ]:UBC 0 : ereport(FATAL,
2648 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2649 : : errmsg("physical replication slot \"%s\" exists, but \"wal_level\" < \"replica\"",
2650 : : NameStr(cp.slotdata.name)),
2651 : : errhint("Change \"wal_level\" to be \"replica\" or higher.")));
2652 : :
2653 : : /* nothing can be active yet, don't lock anything */
4236 rhaas@postgresql.org 2654 [ + - ]:CBC 141 : for (i = 0; i < max_replication_slots; i++)
2655 : : {
2656 : : ReplicationSlot *slot;
2657 : :
2658 : 141 : slot = &ReplicationSlotCtl->replication_slots[i];
2659 : :
2660 [ + + ]: 141 : if (slot->in_use)
2661 : 45 : continue;
2662 : :
2663 : : /* restore the entire set of persistent data */
2664 : 96 : memcpy(&slot->data, &cp.slotdata,
2665 : : sizeof(ReplicationSlotPersistentData));
2666 : :
2667 : : /* initialize in memory state */
2668 : 96 : slot->effective_xmin = cp.slotdata.xmin;
4205 2669 : 96 : slot->effective_catalog_xmin = cp.slotdata.catalog_xmin;
723 akapila@postgresql.o 2670 : 96 : slot->last_saved_confirmed_flush = cp.slotdata.confirmed_flush;
84 akorotkov@postgresql 2671 : 96 : slot->last_saved_restart_lsn = cp.slotdata.restart_lsn;
2672 : :
4205 rhaas@postgresql.org 2673 : 96 : slot->candidate_catalog_xmin = InvalidTransactionId;
2674 : 96 : slot->candidate_xmin_lsn = InvalidXLogRecPtr;
2675 : 96 : slot->candidate_restart_lsn = InvalidXLogRecPtr;
2676 : 96 : slot->candidate_restart_valid = InvalidXLogRecPtr;
2677 : :
4236 2678 : 96 : slot->in_use = true;
3791 andres@anarazel.de 2679 : 96 : slot->active_pid = 0;
2680 : :
2681 : : /*
2682 : : * Set the time since the slot has become inactive after loading the
2683 : : * slot from the disk into memory. Whoever acquires the slot i.e.
2684 : : * makes the slot active will reset it. Use the same inactive_since
2685 : : * time for all the slots.
2686 : : */
213 akapila@postgresql.o 2687 [ + - ]: 96 : if (now == 0)
2688 : 96 : now = GetCurrentTimestamp();
2689 : :
2690 : 96 : ReplicationSlotSetInactiveSince(slot, now, false);
2691 : :
4236 rhaas@postgresql.org 2692 : 96 : restored = true;
2693 : 96 : break;
2694 : : }
2695 : :
2696 [ - + ]: 96 : if (!restored)
2500 michael@paquier.xyz 2697 [ # # ]:UBC 0 : ereport(FATAL,
2698 : : (errmsg("too many replication slots active before shutdown"),
2699 : : errhint("Increase \"max_replication_slots\" and try again.")));
2700 : : }
2701 : :
2702 : : /*
2703 : : * Maps an invalidation reason for a replication slot to
2704 : : * ReplicationSlotInvalidationCause.
2705 : : */
2706 : : ReplicationSlotInvalidationCause
199 akapila@postgresql.o 2707 : 0 : GetSlotInvalidationCause(const char *cause_name)
2708 : : {
2709 [ # # ]: 0 : Assert(cause_name);
2710 : :
2711 : : /* Search lookup table for the cause having this name */
2712 [ # # ]: 0 : for (int i = 0; i <= RS_INVAL_MAX_CAUSES; i++)
2713 : : {
2714 [ # # ]: 0 : if (strcmp(SlotInvalidationCauses[i].cause_name, cause_name) == 0)
2715 : 0 : return SlotInvalidationCauses[i].cause;
2716 : : }
2717 : :
2718 : 0 : Assert(false);
2719 : : return RS_INVAL_NONE; /* to keep compiler quiet */
2720 : : }
2721 : :
2722 : : /*
2723 : : * Maps an ReplicationSlotInvalidationCause to the invalidation
2724 : : * reason for a replication slot.
2725 : : */
2726 : : const char *
199 akapila@postgresql.o 2727 :CBC 4 : GetSlotInvalidationCauseName(ReplicationSlotInvalidationCause cause)
2728 : : {
2729 : : /* Search lookup table for the name of this cause */
2730 [ + - ]: 8 : for (int i = 0; i <= RS_INVAL_MAX_CAUSES; i++)
2731 : : {
2732 [ + + ]: 8 : if (SlotInvalidationCauses[i].cause == cause)
2733 : 4 : return SlotInvalidationCauses[i].cause_name;
2734 : : }
2735 : :
199 akapila@postgresql.o 2736 :UBC 0 : Assert(false);
2737 : : return "none"; /* to keep compiler quiet */
2738 : : }
2739 : :
2740 : : /*
2741 : : * A helper function to validate slots specified in GUC synchronized_standby_slots.
2742 : : *
2743 : : * The rawname will be parsed, and the result will be saved into *elemlist.
2744 : : */
2745 : : static bool
432 akapila@postgresql.o 2746 :CBC 9 : validate_sync_standby_slots(char *rawname, List **elemlist)
2747 : : {
2748 : : bool ok;
2749 : :
2750 : : /* Verify syntax and parse string into a list of identifiers */
547 2751 : 9 : ok = SplitIdentifierString(rawname, ',', elemlist);
2752 : :
2753 [ - + ]: 9 : if (!ok)
2754 : : {
547 akapila@postgresql.o 2755 :UBC 0 : GUC_check_errdetail("List syntax is invalid.");
2756 : : }
277 alvherre@alvh.no-ip. 2757 [ + + ]:CBC 9 : else if (MyProc)
2758 : : {
2759 : : /*
2760 : : * Check that each specified slot exist and is physical.
2761 : : *
2762 : : * Because we need an LWLock, we cannot do this on processes without a
2763 : : * PGPROC, so we skip it there; but see comments in
2764 : : * StandbySlotsHaveCaughtup() as to why that's not a problem.
2765 : : */
547 akapila@postgresql.o 2766 : 5 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
2767 : :
2768 [ + - + + : 15 : foreach_ptr(char, name, *elemlist)
+ + ]
2769 : : {
2770 : : ReplicationSlot *slot;
2771 : :
2772 : 5 : slot = SearchNamedReplicationSlot(name, false);
2773 : :
2774 [ - + ]: 5 : if (!slot)
2775 : : {
283 alvherre@alvh.no-ip. 2776 :UBC 0 : GUC_check_errdetail("Replication slot \"%s\" does not exist.",
2777 : : name);
547 akapila@postgresql.o 2778 : 0 : ok = false;
2779 : 0 : break;
2780 : : }
2781 : :
547 akapila@postgresql.o 2782 [ - + ]:CBC 5 : if (!SlotIsPhysical(slot))
2783 : : {
283 alvherre@alvh.no-ip. 2784 :UBC 0 : GUC_check_errdetail("\"%s\" is not a physical replication slot.",
2785 : : name);
547 akapila@postgresql.o 2786 : 0 : ok = false;
2787 : 0 : break;
2788 : : }
2789 : : }
2790 : :
547 akapila@postgresql.o 2791 :CBC 5 : LWLockRelease(ReplicationSlotControlLock);
2792 : : }
2793 : :
2794 : 9 : return ok;
2795 : : }
2796 : :
2797 : : /*
2798 : : * GUC check_hook for synchronized_standby_slots
2799 : : */
2800 : : bool
432 2801 : 1083 : check_synchronized_standby_slots(char **newval, void **extra, GucSource source)
2802 : : {
2803 : : char *rawname;
2804 : : char *ptr;
2805 : : List *elemlist;
2806 : : int size;
2807 : : bool ok;
2808 : : SyncStandbySlotsConfigData *config;
2809 : :
547 2810 [ + + ]: 1083 : if ((*newval)[0] == '\0')
2811 : 1074 : return true;
2812 : :
2813 : : /* Need a modifiable copy of the GUC string */
2814 : 9 : rawname = pstrdup(*newval);
2815 : :
2816 : : /* Now verify if the specified slots exist and have correct type */
432 2817 : 9 : ok = validate_sync_standby_slots(rawname, &elemlist);
2818 : :
547 2819 [ + - - + ]: 9 : if (!ok || elemlist == NIL)
2820 : : {
547 akapila@postgresql.o 2821 :UBC 0 : pfree(rawname);
2822 : 0 : list_free(elemlist);
2823 : 0 : return ok;
2824 : : }
2825 : :
2826 : : /* Compute the size required for the SyncStandbySlotsConfigData struct */
432 akapila@postgresql.o 2827 :CBC 9 : size = offsetof(SyncStandbySlotsConfigData, slot_names);
547 2828 [ + - + + : 27 : foreach_ptr(char, slot_name, elemlist)
+ + ]
2829 : 9 : size += strlen(slot_name) + 1;
2830 : :
2831 : : /* GUC extra value must be guc_malloc'd, not palloc'd */
432 2832 : 9 : config = (SyncStandbySlotsConfigData *) guc_malloc(LOG, size);
163 dgustafsson@postgres 2833 [ - + ]: 9 : if (!config)
163 dgustafsson@postgres 2834 :UBC 0 : return false;
2835 : :
2836 : : /* Transform the data into SyncStandbySlotsConfigData */
547 akapila@postgresql.o 2837 :CBC 9 : config->nslotnames = list_length(elemlist);
2838 : :
2839 : 9 : ptr = config->slot_names;
2840 [ + - + + : 27 : foreach_ptr(char, slot_name, elemlist)
+ + ]
2841 : : {
2842 : 9 : strcpy(ptr, slot_name);
2843 : 9 : ptr += strlen(slot_name) + 1;
2844 : : }
2845 : :
282 peter@eisentraut.org 2846 : 9 : *extra = config;
2847 : :
547 akapila@postgresql.o 2848 : 9 : pfree(rawname);
2849 : 9 : list_free(elemlist);
2850 : 9 : return true;
2851 : : }
2852 : :
2853 : : /*
2854 : : * GUC assign_hook for synchronized_standby_slots
2855 : : */
2856 : : void
432 2857 : 1083 : assign_synchronized_standby_slots(const char *newval, void *extra)
2858 : : {
2859 : : /*
2860 : : * The standby slots may have changed, so we must recompute the oldest
2861 : : * LSN.
2862 : : */
547 2863 : 1083 : ss_oldest_flush_lsn = InvalidXLogRecPtr;
2864 : :
432 2865 : 1083 : synchronized_standby_slots_config = (SyncStandbySlotsConfigData *) extra;
547 2866 : 1083 : }
2867 : :
2868 : : /*
2869 : : * Check if the passed slot_name is specified in the synchronized_standby_slots GUC.
2870 : : */
2871 : : bool
432 2872 : 62195 : SlotExistsInSyncStandbySlots(const char *slot_name)
2873 : : {
2874 : : const char *standby_slot_name;
2875 : :
2876 : : /* Return false if there is no value in synchronized_standby_slots */
2877 [ + + ]: 62195 : if (synchronized_standby_slots_config == NULL)
547 2878 : 62189 : return false;
2879 : :
2880 : : /*
2881 : : * XXX: We are not expecting this list to be long so a linear search
2882 : : * shouldn't hurt but if that turns out not to be true then we can cache
2883 : : * this information for each WalSender as well.
2884 : : */
432 2885 : 6 : standby_slot_name = synchronized_standby_slots_config->slot_names;
2886 [ + - ]: 6 : for (int i = 0; i < synchronized_standby_slots_config->nslotnames; i++)
2887 : : {
547 2888 [ + - ]: 6 : if (strcmp(standby_slot_name, slot_name) == 0)
2889 : 6 : return true;
2890 : :
547 akapila@postgresql.o 2891 :UBC 0 : standby_slot_name += strlen(standby_slot_name) + 1;
2892 : : }
2893 : :
2894 : 0 : return false;
2895 : : }
2896 : :
2897 : : /*
2898 : : * Return true if the slots specified in synchronized_standby_slots have caught up to
2899 : : * the given WAL location, false otherwise.
2900 : : *
2901 : : * The elevel parameter specifies the error level used for logging messages
2902 : : * related to slots that do not exist, are invalidated, or are inactive.
2903 : : */
2904 : : bool
547 akapila@postgresql.o 2905 :CBC 637 : StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
2906 : : {
2907 : : const char *name;
2908 : 637 : int caught_up_slot_num = 0;
2909 : 637 : XLogRecPtr min_restart_lsn = InvalidXLogRecPtr;
2910 : :
2911 : : /*
2912 : : * Don't need to wait for the standbys to catch up if there is no value in
2913 : : * synchronized_standby_slots.
2914 : : */
432 2915 [ + + ]: 637 : if (synchronized_standby_slots_config == NULL)
547 2916 : 619 : return true;
2917 : :
2918 : : /*
2919 : : * Don't need to wait for the standbys to catch up if we are on a standby
2920 : : * server, since we do not support syncing slots to cascading standbys.
2921 : : */
2922 [ - + ]: 18 : if (RecoveryInProgress())
547 akapila@postgresql.o 2923 :UBC 0 : return true;
2924 : :
2925 : : /*
2926 : : * Don't need to wait for the standbys to catch up if they are already
2927 : : * beyond the specified WAL location.
2928 : : */
547 akapila@postgresql.o 2929 [ + + ]:CBC 18 : if (!XLogRecPtrIsInvalid(ss_oldest_flush_lsn) &&
2930 [ + + ]: 9 : ss_oldest_flush_lsn >= wait_for_lsn)
2931 : 6 : return true;
2932 : :
2933 : : /*
2934 : : * To prevent concurrent slot dropping and creation while filtering the
2935 : : * slots, take the ReplicationSlotControlLock outside of the loop.
2936 : : */
2937 : 12 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
2938 : :
432 2939 : 12 : name = synchronized_standby_slots_config->slot_names;
2940 [ + + ]: 17 : for (int i = 0; i < synchronized_standby_slots_config->nslotnames; i++)
2941 : : {
2942 : : XLogRecPtr restart_lsn;
2943 : : bool invalidated;
2944 : : bool inactive;
2945 : : ReplicationSlot *slot;
2946 : :
547 2947 : 12 : slot = SearchNamedReplicationSlot(name, false);
2948 : :
2949 : : /*
2950 : : * If a slot name provided in synchronized_standby_slots does not
2951 : : * exist, report a message and exit the loop.
2952 : : *
2953 : : * Though validate_sync_standby_slots (the GUC check_hook) tries to
2954 : : * avoid this, it can nonetheless happen because the user can specify
2955 : : * a nonexistent slot name before server startup. That function cannot
2956 : : * validate such a slot during startup, as ReplicationSlotCtl is not
2957 : : * initialized by then. Also, the user might have dropped one slot.
2958 : : */
2959 [ - + ]: 12 : if (!slot)
2960 : : {
547 akapila@postgresql.o 2961 [ # # ]:UBC 0 : ereport(elevel,
2962 : : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2963 : : errmsg("replication slot \"%s\" specified in parameter \"%s\" does not exist",
2964 : : name, "synchronized_standby_slots"),
2965 : : errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
2966 : : name),
2967 : : errhint("Create the replication slot \"%s\" or amend parameter \"%s\".",
2968 : : name, "synchronized_standby_slots"));
2969 : 0 : break;
2970 : : }
2971 : :
2972 : : /* Same as above: if a slot is not physical, exit the loop. */
547 akapila@postgresql.o 2973 [ - + ]:CBC 12 : if (SlotIsLogical(slot))
2974 : : {
547 akapila@postgresql.o 2975 [ # # ]:UBC 0 : ereport(elevel,
2976 : : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2977 : : errmsg("cannot specify logical replication slot \"%s\" in parameter \"%s\"",
2978 : : name, "synchronized_standby_slots"),
2979 : : errdetail("Logical replication is waiting for correction on replication slot \"%s\".",
2980 : : name),
2981 : : errhint("Remove the logical replication slot \"%s\" from parameter \"%s\".",
2982 : : name, "synchronized_standby_slots"));
2983 : 0 : break;
2984 : : }
2985 : :
547 akapila@postgresql.o 2986 [ - + ]:CBC 12 : SpinLockAcquire(&slot->mutex);
2987 : 12 : restart_lsn = slot->data.restart_lsn;
2988 : 12 : invalidated = slot->data.invalidated != RS_INVAL_NONE;
2989 : 12 : inactive = slot->active_pid == 0;
2990 : 12 : SpinLockRelease(&slot->mutex);
2991 : :
2992 [ - + ]: 12 : if (invalidated)
2993 : : {
2994 : : /* Specified physical slot has been invalidated */
547 akapila@postgresql.o 2995 [ # # ]:UBC 0 : ereport(elevel,
2996 : : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2997 : : errmsg("physical replication slot \"%s\" specified in parameter \"%s\" has been invalidated",
2998 : : name, "synchronized_standby_slots"),
2999 : : errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
3000 : : name),
3001 : : errhint("Drop and recreate the replication slot \"%s\", or amend parameter \"%s\".",
3002 : : name, "synchronized_standby_slots"));
3003 : 0 : break;
3004 : : }
3005 : :
547 akapila@postgresql.o 3006 [ + + + + ]:CBC 12 : if (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn < wait_for_lsn)
3007 : : {
3008 : : /* Log a message if no active_pid for this physical slot */
3009 [ + - ]: 7 : if (inactive)
3010 [ + - ]: 7 : ereport(elevel,
3011 : : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
3012 : : errmsg("replication slot \"%s\" specified in parameter \"%s\" does not have active_pid",
3013 : : name, "synchronized_standby_slots"),
3014 : : errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
3015 : : name),
3016 : : errhint("Start the standby associated with the replication slot \"%s\", or amend parameter \"%s\".",
3017 : : name, "synchronized_standby_slots"));
3018 : :
3019 : : /* Continue if the current slot hasn't caught up. */
3020 : 7 : break;
3021 : : }
3022 : :
3023 [ - + ]: 5 : Assert(restart_lsn >= wait_for_lsn);
3024 : :
3025 [ - + - - ]: 5 : if (XLogRecPtrIsInvalid(min_restart_lsn) ||
3026 : : min_restart_lsn > restart_lsn)
3027 : 5 : min_restart_lsn = restart_lsn;
3028 : :
3029 : 5 : caught_up_slot_num++;
3030 : :
3031 : 5 : name += strlen(name) + 1;
3032 : : }
3033 : :
3034 : 12 : LWLockRelease(ReplicationSlotControlLock);
3035 : :
3036 : : /*
3037 : : * Return false if not all the standbys have caught up to the specified
3038 : : * WAL location.
3039 : : */
432 3040 [ + + ]: 12 : if (caught_up_slot_num != synchronized_standby_slots_config->nslotnames)
547 3041 : 7 : return false;
3042 : :
3043 : : /* The ss_oldest_flush_lsn must not retreat. */
3044 [ + + - + ]: 5 : Assert(XLogRecPtrIsInvalid(ss_oldest_flush_lsn) ||
3045 : : min_restart_lsn >= ss_oldest_flush_lsn);
3046 : :
3047 : 5 : ss_oldest_flush_lsn = min_restart_lsn;
3048 : :
3049 : 5 : return true;
3050 : : }
3051 : :
3052 : : /*
3053 : : * Wait for physical standbys to confirm receiving the given lsn.
3054 : : *
3055 : : * Used by logical decoding SQL functions. It waits for physical standbys
3056 : : * corresponding to the physical slots specified in the synchronized_standby_slots GUC.
3057 : : */
3058 : : void
3059 : 218 : WaitForStandbyConfirmation(XLogRecPtr wait_for_lsn)
3060 : : {
3061 : : /*
3062 : : * Don't need to wait for the standby to catch up if the current acquired
3063 : : * slot is not a logical failover slot, or there is no value in
3064 : : * synchronized_standby_slots.
3065 : : */
432 3066 [ + + + + ]: 218 : if (!MyReplicationSlot->data.failover || !synchronized_standby_slots_config)
547 3067 : 217 : return;
3068 : :
3069 : 1 : ConditionVariablePrepareToSleep(&WalSndCtl->wal_confirm_rcv_cv);
3070 : :
3071 : : for (;;)
3072 : : {
3073 [ - + ]: 2 : CHECK_FOR_INTERRUPTS();
3074 : :
3075 [ + + ]: 2 : if (ConfigReloadPending)
3076 : : {
3077 : 1 : ConfigReloadPending = false;
3078 : 1 : ProcessConfigFile(PGC_SIGHUP);
3079 : : }
3080 : :
3081 : : /* Exit if done waiting for every slot. */
3082 [ + + ]: 2 : if (StandbySlotsHaveCaughtup(wait_for_lsn, WARNING))
3083 : 1 : break;
3084 : :
3085 : : /*
3086 : : * Wait for the slots in the synchronized_standby_slots to catch up,
3087 : : * but use a timeout (1s) so we can also check if the
3088 : : * synchronized_standby_slots has been changed.
3089 : : */
3090 : 1 : ConditionVariableTimedSleep(&WalSndCtl->wal_confirm_rcv_cv, 1000,
3091 : : WAIT_EVENT_WAIT_FOR_STANDBY_CONFIRMATION);
3092 : : }
3093 : :
3094 : 1 : ConditionVariableCancelSleep();
3095 : : }
|