Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * slot.c
4 : : * Replication slot management.
5 : : *
6 : : *
7 : : * Copyright (c) 2012-2026, PostgreSQL Global Development Group
8 : : *
9 : : *
10 : : * IDENTIFICATION
11 : : * src/backend/replication/slot.c
12 : : *
13 : : * NOTES
14 : : *
15 : : * Replication slots are used to keep state about replication streams
16 : : * originating from this cluster. Their primary purpose is to prevent the
17 : : * premature removal of WAL or of old tuple versions in a manner that would
18 : : * interfere with replication; they are also useful for monitoring purposes.
19 : : * Slots need to be permanent (to allow restarts), crash-safe, and allocatable
20 : : * on standbys (to support cascading setups). The requirement that slots be
21 : : * usable on standbys precludes storing them in the system catalogs.
22 : : *
23 : : * Each replication slot gets its own directory inside the directory
24 : : * $PGDATA / PG_REPLSLOT_DIR. Inside that directory the state file will
25 : : * contain the slot's own data. Additional data can be stored alongside that
26 : : * file if required. While the server is running, the state data is also
27 : : * cached in memory for efficiency.
28 : : *
29 : : * ReplicationSlotAllocationLock must be taken in exclusive mode to allocate
30 : : * or free a slot. ReplicationSlotControlLock must be taken in shared mode
31 : : * to iterate over the slots, and in exclusive mode to change the in_use flag
32 : : * of a slot. The remaining data in each slot is protected by its mutex.
33 : : *
34 : : *-------------------------------------------------------------------------
35 : : */
36 : :
37 : : #include "postgres.h"
38 : :
39 : : #include <unistd.h>
40 : : #include <sys/stat.h>
41 : :
42 : : #include "access/transam.h"
43 : : #include "access/xlog_internal.h"
44 : : #include "access/xlogrecovery.h"
45 : : #include "common/file_utils.h"
46 : : #include "common/string.h"
47 : : #include "miscadmin.h"
48 : : #include "pgstat.h"
49 : : #include "postmaster/interrupt.h"
50 : : #include "replication/logicallauncher.h"
51 : : #include "replication/slotsync.h"
52 : : #include "replication/slot.h"
53 : : #include "replication/walsender_private.h"
54 : : #include "storage/fd.h"
55 : : #include "storage/ipc.h"
56 : : #include "storage/proc.h"
57 : : #include "storage/procarray.h"
58 : : #include "storage/subsystems.h"
59 : : #include "utils/builtins.h"
60 : : #include "utils/guc_hooks.h"
61 : : #include "utils/injection_point.h"
62 : : #include "utils/varlena.h"
63 : : #include "utils/wait_event.h"
64 : :
65 : : /*
66 : : * Replication slot on-disk data structure.
67 : : */
68 : : typedef struct ReplicationSlotOnDisk
69 : : {
70 : : /* first part of this struct needs to be version independent */
71 : :
72 : : /* data not covered by checksum */
73 : : uint32 magic;
74 : : pg_crc32c checksum;
75 : :
76 : : /* data covered by checksum */
77 : : uint32 version;
78 : : uint32 length;
79 : :
80 : : /*
81 : : * The actual data in the slot that follows can differ based on the above
82 : : * 'version'.
83 : : */
84 : :
85 : : ReplicationSlotPersistentData slotdata;
86 : : } ReplicationSlotOnDisk;
87 : :
88 : : /*
89 : : * Struct for the configuration of synchronized_standby_slots.
90 : : *
91 : : * Note: this must be a flat representation that can be held in a single chunk
92 : : * of guc_malloc'd memory, so that it can be stored as the "extra" data for the
93 : : * synchronized_standby_slots GUC.
94 : : */
95 : : typedef struct
96 : : {
97 : : /* Number of slot names in the slot_names[] */
98 : : int nslotnames;
99 : :
100 : : /*
101 : : * slot_names contains 'nslotnames' consecutive null-terminated C strings.
102 : : */
103 : : char slot_names[FLEXIBLE_ARRAY_MEMBER];
104 : : } SyncStandbySlotsConfigData;
105 : :
106 : : /*
107 : : * Lookup table for slot invalidation causes.
108 : : */
109 : : typedef struct SlotInvalidationCauseMap
110 : : {
111 : : ReplicationSlotInvalidationCause cause;
112 : : const char *cause_name;
113 : : } SlotInvalidationCauseMap;
114 : :
115 : : static const SlotInvalidationCauseMap SlotInvalidationCauses[] = {
116 : : {RS_INVAL_NONE, "none"},
117 : : {RS_INVAL_WAL_REMOVED, "wal_removed"},
118 : : {RS_INVAL_HORIZON, "rows_removed"},
119 : : {RS_INVAL_WAL_LEVEL, "wal_level_insufficient"},
120 : : {RS_INVAL_IDLE_TIMEOUT, "idle_timeout"},
121 : : };
122 : :
123 : : /*
124 : : * Ensure that the lookup table is up-to-date with the enums defined in
125 : : * ReplicationSlotInvalidationCause.
126 : : */
127 : : StaticAssertDecl(lengthof(SlotInvalidationCauses) == (RS_INVAL_MAX_CAUSES + 1),
128 : : "array length mismatch");
129 : :
130 : : /* size of version independent data */
131 : : #define ReplicationSlotOnDiskConstantSize \
132 : : offsetof(ReplicationSlotOnDisk, slotdata)
133 : : /* size of the part of the slot not covered by the checksum */
134 : : #define ReplicationSlotOnDiskNotChecksummedSize \
135 : : offsetof(ReplicationSlotOnDisk, version)
136 : : /* size of the part covered by the checksum */
137 : : #define ReplicationSlotOnDiskChecksummedSize \
138 : : sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskNotChecksummedSize
139 : : /* size of the slot data that is version dependent */
140 : : #define ReplicationSlotOnDiskV2Size \
141 : : sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize
142 : :
143 : : #define SLOT_MAGIC 0x1051CA1 /* format identifier */
144 : : #define SLOT_VERSION 5 /* version for new files */
145 : :
146 : : /* Control array for replication slot management */
147 : : ReplicationSlotCtlData *ReplicationSlotCtl = NULL;
148 : :
149 : : static void ReplicationSlotsShmemRequest(void *arg);
150 : : static void ReplicationSlotsShmemInit(void *arg);
151 : :
152 : : const ShmemCallbacks ReplicationSlotsShmemCallbacks = {
153 : : .request_fn = ReplicationSlotsShmemRequest,
154 : : .init_fn = ReplicationSlotsShmemInit,
155 : : };
156 : :
157 : : /* My backend's replication slot in the shared memory array */
158 : : ReplicationSlot *MyReplicationSlot = NULL;
159 : :
160 : : /* GUC variables */
161 : : int max_replication_slots = 10; /* the maximum number of replication
162 : : * slots */
163 : : int max_repack_replication_slots = 5; /* the maximum number of slots
164 : : * for REPACK */
165 : :
166 : : /*
167 : : * Invalidate replication slots that have remained idle longer than this
168 : : * duration; '0' disables it.
169 : : */
170 : : int idle_replication_slot_timeout_secs = 0;
171 : :
172 : : /*
173 : : * This GUC lists streaming replication standby server slot names that
174 : : * logical WAL sender processes will wait for.
175 : : */
176 : : char *synchronized_standby_slots;
177 : :
178 : : /* This is the parsed and cached configuration for synchronized_standby_slots */
179 : : static SyncStandbySlotsConfigData *synchronized_standby_slots_config;
180 : :
181 : : /*
182 : : * Oldest LSN that has been confirmed to be flushed to the standbys
183 : : * corresponding to the physical slots specified in the synchronized_standby_slots GUC.
184 : : */
185 : : static XLogRecPtr ss_oldest_flush_lsn = InvalidXLogRecPtr;
186 : :
187 : : static void ReplicationSlotShmemExit(int code, Datum arg);
188 : : static bool IsSlotForConflictCheck(const char *name);
189 : : static void ReplicationSlotDropPtr(ReplicationSlot *slot);
190 : :
191 : : /* internal persistency functions */
192 : : static void RestoreSlotFromDisk(const char *name);
193 : : static void CreateSlotOnDisk(ReplicationSlot *slot);
194 : : static void SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel);
195 : :
196 : : /*
197 : : * Register shared memory space needed for replication slots.
198 : : */
199 : : static void
54 heikki.linnakangas@i 200 :GNC 1251 : ReplicationSlotsShmemRequest(void *arg)
201 : : {
202 : : Size size;
203 : :
53 alvherre@kurilemu.de 204 [ - + ]: 1251 : if (max_replication_slots + max_repack_replication_slots == 0)
54 heikki.linnakangas@i 205 :UNC 0 : return;
206 : :
4502 rhaas@postgresql.org 207 :CBC 1251 : size = offsetof(ReplicationSlotCtlData, replication_slots);
208 : 1251 : size = add_size(size,
53 alvherre@kurilemu.de 209 :GNC 1251 : mul_size(max_replication_slots + max_repack_replication_slots,
210 : : sizeof(ReplicationSlot)));
54 heikki.linnakangas@i 211 : 1251 : ShmemRequestStruct(.name = "ReplicationSlot Ctl",
212 : : .size = size,
213 : : .ptr = (void **) &ReplicationSlotCtl,
214 : : );
215 : : }
216 : :
217 : : /*
218 : : * Initialize shared memory for replication slots.
219 : : */
220 : : static void
221 : 1248 : ReplicationSlotsShmemInit(void *arg)
222 : : {
53 alvherre@kurilemu.de 223 [ + + ]: 19760 : for (int i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
224 : : {
54 heikki.linnakangas@i 225 : 18512 : ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[i];
226 : :
227 : : /* everything else is zeroed by the memset above */
228 : 18512 : slot->active_proc = INVALID_PROC_NUMBER;
229 : 18512 : SpinLockInit(&slot->mutex);
230 : 18512 : LWLockInitialize(&slot->io_in_progress_lock,
231 : : LWTRANCHE_REPLICATION_SLOT_IO);
232 : 18512 : ConditionVariableInit(&slot->active_cv);
233 : : }
4502 rhaas@postgresql.org 234 :GIC 1248 : }
235 : :
236 : : /*
237 : : * Register the callback for replication slot cleanup and releasing.
238 : : */
239 : : void
1566 andres@anarazel.de 240 :CBC 23091 : ReplicationSlotInitialize(void)
241 : : {
242 : 23091 : before_shmem_exit(ReplicationSlotShmemExit, 0);
243 : 23091 : }
244 : :
245 : : /*
246 : : * Release and cleanup replication slots.
247 : : */
248 : : static void
249 : 23091 : ReplicationSlotShmemExit(int code, Datum arg)
250 : : {
251 : : /* Make sure active replication slots are released */
252 [ + + ]: 23091 : if (MyReplicationSlot != NULL)
253 : 289 : ReplicationSlotRelease();
254 : :
255 : : /* Also cleanup all the temporary slots. */
765 akapila@postgresql.o 256 : 23091 : ReplicationSlotCleanup(false);
1566 andres@anarazel.de 257 : 23091 : }
258 : :
259 : : /*
260 : : * Check whether the passed slot name is valid and report errors at elevel.
261 : : *
262 : : * See comments for ReplicationSlotValidateNameInternal().
263 : : */
264 : : bool
220 fujii@postgresql.org 265 :GNC 870 : ReplicationSlotValidateName(const char *name, bool allow_reserved_name,
266 : : int elevel)
267 : : {
268 : : int err_code;
220 fujii@postgresql.org 269 :CBC 870 : char *err_msg = NULL;
270 : 870 : char *err_hint = NULL;
271 : :
220 fujii@postgresql.org 272 [ + + ]:GNC 870 : if (!ReplicationSlotValidateNameInternal(name, allow_reserved_name,
273 : : &err_code, &err_msg, &err_hint))
274 : : {
275 : : /*
276 : : * Use errmsg_internal() and errhint_internal() instead of errmsg()
277 : : * and errhint(), since the messages from
278 : : * ReplicationSlotValidateNameInternal() are already translated. This
279 : : * avoids double translation.
280 : : */
220 fujii@postgresql.org 281 [ + - + + ]:CBC 5 : ereport(elevel,
282 : : errcode(err_code),
283 : : errmsg_internal("%s", err_msg),
284 : : (err_hint != NULL) ? errhint_internal("%s", err_hint) : 0);
285 : :
220 fujii@postgresql.org 286 :UBC 0 : pfree(err_msg);
287 [ # # ]: 0 : if (err_hint != NULL)
288 : 0 : pfree(err_hint);
289 : 0 : return false;
290 : : }
291 : :
220 fujii@postgresql.org 292 :CBC 865 : return true;
293 : : }
294 : :
295 : : /*
296 : : * Check whether the passed slot name is valid.
297 : : *
298 : : * An error will be reported for a reserved replication slot name if
299 : : * allow_reserved_name is set to false.
300 : : *
301 : : * Slot names may consist out of [a-z0-9_]{1,NAMEDATALEN-1} which should allow
302 : : * the name to be used as a directory name on every supported OS.
303 : : *
304 : : * Returns true if the slot name is valid. Otherwise, returns false and stores
305 : : * the error code, error message, and optional hint in err_code, err_msg, and
306 : : * err_hint, respectively. The caller is responsible for freeing err_msg and
307 : : * err_hint, which are palloc'd.
308 : : */
309 : : bool
220 fujii@postgresql.org 310 :GNC 1087 : ReplicationSlotValidateNameInternal(const char *name, bool allow_reserved_name,
311 : : int *err_code, char **err_msg, char **err_hint)
312 : : {
313 : : const char *cp;
314 : :
4502 rhaas@postgresql.org 315 [ + + ]:CBC 1087 : if (strlen(name) == 0)
316 : : {
220 fujii@postgresql.org 317 : 4 : *err_code = ERRCODE_INVALID_NAME;
318 : 4 : *err_msg = psprintf(_("replication slot name \"%s\" is too short"), name);
319 : 4 : *err_hint = NULL;
4502 rhaas@postgresql.org 320 : 4 : return false;
321 : : }
322 : :
323 [ - + ]: 1083 : if (strlen(name) >= NAMEDATALEN)
324 : : {
220 fujii@postgresql.org 325 :UBC 0 : *err_code = ERRCODE_NAME_TOO_LONG;
326 : 0 : *err_msg = psprintf(_("replication slot name \"%s\" is too long"), name);
327 : 0 : *err_hint = NULL;
4502 rhaas@postgresql.org 328 : 0 : return false;
329 : : }
330 : :
4502 rhaas@postgresql.org 331 [ + + ]:CBC 20703 : for (cp = name; *cp; cp++)
332 : : {
333 [ + + - + ]: 19622 : if (!((*cp >= 'a' && *cp <= 'z')
334 [ + + + + ]: 9242 : || (*cp >= '0' && *cp <= '9')
335 [ + + ]: 1923 : || (*cp == '_')))
336 : : {
220 fujii@postgresql.org 337 : 2 : *err_code = ERRCODE_INVALID_NAME;
338 : 2 : *err_msg = psprintf(_("replication slot name \"%s\" contains invalid character"), name);
339 : 2 : *err_hint = psprintf(_("Replication slot names may only contain lower case letters, numbers, and the underscore character."));
4502 rhaas@postgresql.org 340 : 2 : return false;
341 : : }
342 : : }
343 : :
311 akapila@postgresql.o 344 [ + + + + ]:GNC 1081 : if (!allow_reserved_name && IsSlotForConflictCheck(name))
345 : : {
220 fujii@postgresql.org 346 : 1 : *err_code = ERRCODE_RESERVED_NAME;
347 : 1 : *err_msg = psprintf(_("replication slot name \"%s\" is reserved"), name);
348 : 1 : *err_hint = psprintf(_("The name \"%s\" is reserved for the conflict detection slot."),
349 : : CONFLICT_DETECTION_SLOT);
311 akapila@postgresql.o 350 : 1 : return false;
351 : : }
352 : :
4502 rhaas@postgresql.org 353 :CBC 1080 : return true;
354 : : }
355 : :
356 : : /*
357 : : * Return true if the replication slot name is "pg_conflict_detection".
358 : : */
359 : : static bool
311 akapila@postgresql.o 360 :GNC 2326 : IsSlotForConflictCheck(const char *name)
361 : : {
362 : 2326 : return (strcmp(name, CONFLICT_DETECTION_SLOT) == 0);
363 : : }
364 : :
365 : : /*
366 : : * Create a new replication slot and mark it as used by this backend.
367 : : *
368 : : * name: Name of the slot
369 : : * db_specific: logical decoding is db specific; if the slot is going to
370 : : * be used for that pass true, otherwise false.
371 : : * two_phase: If enabled, allows decoding of prepared transactions.
372 : : * repack: If true, use a slot from the pool for REPACK.
373 : : * failover: If enabled, allows the slot to be synced to standbys so
374 : : * that logical replication can be resumed after failover.
375 : : * synced: True if the slot is synchronized from the primary server.
376 : : */
377 : : void
4471 rhaas@postgresql.org 378 :CBC 722 : ReplicationSlotCreate(const char *name, bool db_specific,
379 : : ReplicationSlotPersistency persistency,
380 : : bool two_phase, bool repack, bool failover, bool synced)
381 : : {
4502 382 : 722 : ReplicationSlot *slot = NULL;
383 : : int startpoint,
384 : : endpoint;
385 : :
386 [ - + ]: 722 : Assert(MyReplicationSlot == NULL);
387 : :
388 : : /*
389 : : * The logical launcher or pg_upgrade may create or migrate an internal
390 : : * slot, so using a reserved name is allowed in these cases.
391 : : */
311 akapila@postgresql.o 392 [ + + + + ]:GNC 722 : ReplicationSlotValidateName(name, IsBinaryUpgrade || IsLogicalLauncher(),
393 : : ERROR);
394 : :
836 akapila@postgresql.o 395 [ + + ]:CBC 721 : if (failover)
396 : : {
397 : : /*
398 : : * Do not allow users to create the failover enabled slots on the
399 : : * standby as we do not support sync to the cascading standby.
400 : : *
401 : : * However, failover enabled slots can be created during slot
402 : : * synchronization because we need to retain the same values as the
403 : : * remote slot.
404 : : */
405 [ + + - + ]: 31 : if (RecoveryInProgress() && !IsSyncingReplicationSlots())
836 akapila@postgresql.o 406 [ # # ]:UBC 0 : ereport(ERROR,
407 : : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
408 : : errmsg("cannot enable failover for a replication slot created on the standby"));
409 : :
410 : : /*
411 : : * Do not allow users to create failover enabled temporary slots,
412 : : * because temporary slots will not be synced to the standby.
413 : : *
414 : : * However, failover enabled temporary slots can be created during
415 : : * slot synchronization. See the comments atop slotsync.c for details.
416 : : */
836 akapila@postgresql.o 417 [ + + + + ]:CBC 31 : if (persistency == RS_TEMPORARY && !IsSyncingReplicationSlots())
418 [ + - ]: 1 : ereport(ERROR,
419 : : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
420 : : errmsg("cannot enable failover for a temporary replication slot"));
421 : : }
422 : :
423 : : /*
424 : : * If some other backend ran this code concurrently with us, we'd likely
425 : : * both allocate the same slot, and that would be bad. We'd also be at
426 : : * risk of missing a name collision. Also, we don't want to try to create
427 : : * a new slot while somebody's busy cleaning up an old one, because we
428 : : * might both be monkeying with the same directory.
429 : : */
4502 rhaas@postgresql.org 430 : 720 : LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
431 : :
432 : : /*
433 : : * Check for name collision (across the whole array), and identify an
434 : : * allocatable slot (in the array slice specific to our current use case:
435 : : * either general, or REPACK only). We need to hold
436 : : * ReplicationSlotControlLock in shared mode for this, so that nobody else
437 : : * can change the in_use flags while we're looking at them.
438 : : */
439 : 720 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
53 alvherre@kurilemu.de 440 [ + + ]:GNC 720 : startpoint = !repack ? 0 : max_replication_slots;
441 [ + + ]: 720 : endpoint = max_replication_slots + (repack ? max_repack_replication_slots : 0);
442 [ + + ]: 10585 : for (int i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
443 : : {
4502 rhaas@postgresql.org 444 :CBC 9868 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
445 : :
446 [ + + + + ]: 9868 : if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
447 [ + - ]: 3 : ereport(ERROR,
448 : : (errcode(ERRCODE_DUPLICATE_OBJECT),
449 : : errmsg("replication slot \"%s\" already exists", name)));
450 : :
53 alvherre@kurilemu.de 451 [ + + + + ]:GNC 9865 : if (i >= startpoint && i < endpoint &&
452 [ + + + + ]: 6257 : !s->in_use && slot == NULL)
4502 rhaas@postgresql.org 453 :CBC 716 : slot = s;
454 : : }
455 : 717 : LWLockRelease(ReplicationSlotControlLock);
456 : :
457 : : /* If all slots are in use, we're out of luck. */
458 [ + + ]: 717 : if (slot == NULL)
459 [ + - ]: 1 : ereport(ERROR,
[ + - - + ]
460 : : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
461 : : errmsg("all replication slots are in use"),
462 : : errhint("Free one or increase \"%s\".",
463 : : repack ? "max_repack_replication_slots" : "max_replication_slots")));
464 : :
465 : : /*
466 : : * Since this slot is not in use, nobody should be looking at any part of
467 : : * it other than the in_use field unless they're trying to allocate it.
468 : : * And since we hold ReplicationSlotAllocationLock, nobody except us can
469 : : * be doing that. So it's safe to initialize the slot.
470 : : */
471 [ - + ]: 716 : Assert(!slot->in_use);
109 heikki.linnakangas@i 472 [ - + ]:GNC 716 : Assert(slot->active_proc == INVALID_PROC_NUMBER);
473 : :
474 : : /* first initialize persistent data */
3573 andres@anarazel.de 475 :CBC 716 : memset(&slot->data, 0, sizeof(ReplicationSlotPersistentData));
2119 peter@eisentraut.org 476 : 716 : namestrcpy(&slot->data.name, name);
4502 rhaas@postgresql.org 477 [ + + ]: 716 : slot->data.database = db_specific ? MyDatabaseId : InvalidOid;
3573 andres@anarazel.de 478 : 716 : slot->data.persistency = persistency;
1914 akapila@postgresql.o 479 : 716 : slot->data.two_phase = two_phase;
1781 480 : 716 : slot->data.two_phase_at = InvalidXLogRecPtr;
856 481 : 716 : slot->data.failover = failover;
836 482 : 716 : slot->data.synced = synced;
483 : :
484 : : /* and then data only present in shared memory */
3573 andres@anarazel.de 485 : 716 : slot->just_dirtied = false;
486 : 716 : slot->dirty = false;
487 : 716 : slot->effective_xmin = InvalidTransactionId;
488 : 716 : slot->effective_catalog_xmin = InvalidTransactionId;
489 : 716 : slot->candidate_catalog_xmin = InvalidTransactionId;
490 : 716 : slot->candidate_xmin_lsn = InvalidXLogRecPtr;
491 : 716 : slot->candidate_restart_valid = InvalidXLogRecPtr;
492 : 716 : slot->candidate_restart_lsn = InvalidXLogRecPtr;
989 akapila@postgresql.o 493 : 716 : slot->last_saved_confirmed_flush = InvalidXLogRecPtr;
350 akorotkov@postgresql 494 : 716 : slot->last_saved_restart_lsn = InvalidXLogRecPtr;
794 akapila@postgresql.o 495 : 716 : slot->inactive_since = 0;
183 akapila@postgresql.o 496 :GNC 716 : slot->slotsync_skip_reason = SS_SKIP_NONE;
497 : :
498 : : /*
499 : : * Create the slot on disk. We haven't actually marked the slot allocated
500 : : * yet, so no special cleanup is required if this errors out.
501 : : */
4502 rhaas@postgresql.org 502 :CBC 716 : CreateSlotOnDisk(slot);
503 : :
504 : : /*
505 : : * We need to briefly prevent any other backend from iterating over the
506 : : * slots while we flip the in_use flag. We also need to set the active
507 : : * flag while holding the ControlLock as otherwise a concurrent
508 : : * ReplicationSlotAcquire() could acquire the slot as well.
509 : : */
510 : 716 : LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
511 : :
512 : 716 : slot->in_use = true;
513 : :
514 : : /* We can now mark the slot active, and that makes it our slot. */
3889 515 [ - + ]: 716 : SpinLockAcquire(&slot->mutex);
109 heikki.linnakangas@i 516 [ - + ]:GNC 716 : Assert(slot->active_proc == INVALID_PROC_NUMBER);
517 : 716 : slot->active_proc = MyProcNumber;
3889 rhaas@postgresql.org 518 :CBC 716 : SpinLockRelease(&slot->mutex);
519 : 716 : MyReplicationSlot = slot;
520 : :
4502 521 : 716 : LWLockRelease(ReplicationSlotControlLock);
522 : :
523 : : /*
524 : : * Create statistics entry for the new logical slot. We don't collect any
525 : : * stats for physical slots, so no need to create an entry for the same.
526 : : * See ReplicationSlotDropPtr for why we need to do this before releasing
527 : : * ReplicationSlotAllocationLock.
528 : : */
2060 akapila@postgresql.o 529 [ + + ]: 716 : if (SlotIsLogical(slot))
1515 andres@anarazel.de 530 : 513 : pgstat_create_replslot(slot);
531 : :
532 : : /*
533 : : * Now that the slot has been marked as in_use and active, it's safe to
534 : : * let somebody else try to allocate a slot.
535 : : */
4502 rhaas@postgresql.org 536 : 716 : LWLockRelease(ReplicationSlotAllocationLock);
537 : :
538 : : /* Let everybody know we've modified this slot */
3231 alvherre@alvh.no-ip. 539 : 716 : ConditionVariableBroadcast(&slot->active_cv);
4502 rhaas@postgresql.org 540 : 716 : }
541 : :
542 : : /*
543 : : * Search for the named replication slot.
544 : : *
545 : : * Return the replication slot if found, otherwise NULL.
546 : : */
547 : : ReplicationSlot *
1859 akapila@postgresql.o 548 : 2108 : SearchNamedReplicationSlot(const char *name, bool need_lock)
549 : : {
550 : : int i;
551 : 2108 : ReplicationSlot *slot = NULL;
552 : :
553 [ + + ]: 2108 : if (need_lock)
554 : 641 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
555 : :
53 alvherre@kurilemu.de 556 [ + + ]:GNC 11128 : for (i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
557 : : {
4502 rhaas@postgresql.org 558 :CBC 10588 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
559 : :
560 [ + + + + ]: 10588 : if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
561 : : {
562 : 1568 : slot = s;
563 : 1568 : break;
564 : : }
565 : : }
566 : :
1859 akapila@postgresql.o 567 [ + + ]: 2108 : if (need_lock)
568 : 641 : LWLockRelease(ReplicationSlotControlLock);
569 : :
2171 fujii@postgresql.org 570 : 2108 : return slot;
571 : : }
572 : :
573 : : /*
574 : : * Return the index of the replication slot in
575 : : * ReplicationSlotCtl->replication_slots.
576 : : *
577 : : * This is mainly useful to have an efficient key for storing replication slot
578 : : * stats.
579 : : */
580 : : int
1515 andres@anarazel.de 581 : 8039 : ReplicationSlotIndex(ReplicationSlot *slot)
582 : : {
583 [ + - - + ]: 8039 : Assert(slot >= ReplicationSlotCtl->replication_slots &&
584 : : slot < ReplicationSlotCtl->replication_slots +
585 : : (max_replication_slots + max_repack_replication_slots));
586 : :
587 : 8039 : return slot - ReplicationSlotCtl->replication_slots;
588 : : }
589 : :
590 : : /*
591 : : * If the slot at 'index' is unused, return false. Otherwise 'name' is set to
592 : : * the slot's name and true is returned.
593 : : *
594 : : * This likely is only useful for pgstat_replslot.c during shutdown, in other
595 : : * cases there are obvious TOCTOU issues.
596 : : */
597 : : bool
1330 598 : 112 : ReplicationSlotName(int index, Name name)
599 : : {
600 : : ReplicationSlot *slot;
601 : : bool found;
602 : :
603 : 112 : slot = &ReplicationSlotCtl->replication_slots[index];
604 : :
605 : : /*
606 : : * Ensure that the slot cannot be dropped while we copy the name. Don't
607 : : * need the spinlock as the name of an existing slot cannot change.
608 : : */
609 : 112 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
610 : 112 : found = slot->in_use;
611 [ + - ]: 112 : if (slot->in_use)
612 : 112 : namestrcpy(name, NameStr(slot->data.name));
613 : 112 : LWLockRelease(ReplicationSlotControlLock);
614 : :
615 : 112 : return found;
616 : : }
617 : :
618 : : /*
619 : : * Find a previously created slot and mark it as used by this process.
620 : : *
621 : : * An error is raised if nowait is true and the slot is currently in use. If
622 : : * nowait is false, we sleep until the slot is released by the owning process.
623 : : *
624 : : * An error is raised if error_if_invalid is true and the slot is found to
625 : : * be invalid. It should always be set to true, except when we are temporarily
626 : : * acquiring the slot and don't intend to change it.
627 : : */
628 : : void
484 akapila@postgresql.o 629 : 1385 : ReplicationSlotAcquire(const char *name, bool nowait, bool error_if_invalid)
630 : : {
631 : : ReplicationSlot *s;
632 : : ProcNumber active_proc;
633 : : int active_pid;
634 : :
1310 peter@eisentraut.org 635 [ + - ]: 1385 : Assert(name != NULL);
636 : :
2171 fujii@postgresql.org 637 : 1385 : retry:
638 [ - + ]: 1385 : Assert(MyReplicationSlot == NULL);
639 : :
640 : 1385 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
641 : :
642 : : /* Check if the slot exists with the given name. */
1814 alvherre@alvh.no-ip. 643 : 1385 : s = SearchNamedReplicationSlot(name, false);
2171 fujii@postgresql.org 644 [ + + - + ]: 1385 : if (s == NULL || !s->in_use)
645 : : {
646 : 10 : LWLockRelease(ReplicationSlotControlLock);
647 : :
4502 rhaas@postgresql.org 648 [ + - ]: 10 : ereport(ERROR,
649 : : (errcode(ERRCODE_UNDEFINED_OBJECT),
650 : : errmsg("replication slot \"%s\" does not exist",
651 : : name)));
652 : : }
653 : :
654 : : /*
655 : : * Do not allow users to acquire the reserved slot. This scenario may
656 : : * occur if the launcher that owns the slot has terminated unexpectedly
657 : : * due to an error, and a backend process attempts to reuse the slot.
658 : : */
311 akapila@postgresql.o 659 [ + + - + ]:GNC 1375 : if (!IsLogicalLauncher() && IsSlotForConflictCheck(name))
311 akapila@postgresql.o 660 [ # # ]:UNC 0 : ereport(ERROR,
661 : : errcode(ERRCODE_UNDEFINED_OBJECT),
662 : : errmsg("cannot acquire replication slot \"%s\"", name),
663 : : errdetail("The slot is reserved for conflict detection and can only be acquired by logical replication launcher."));
664 : :
665 : : /*
666 : : * This is the slot we want; check if it's active under some other
667 : : * process. In single user mode, we don't need this check.
668 : : */
2171 fujii@postgresql.org 669 [ + + ]:CBC 1375 : if (IsUnderPostmaster)
670 : : {
671 : : /*
672 : : * Get ready to sleep on the slot in case it is active. (We may end
673 : : * up not sleeping, but we don't want to do this while holding the
674 : : * spinlock.)
675 : : */
1814 alvherre@alvh.no-ip. 676 [ + + ]: 1370 : if (!nowait)
2171 fujii@postgresql.org 677 : 293 : ConditionVariablePrepareToSleep(&s->active_cv);
678 : :
679 : : /*
680 : : * It is important to reset the inactive_since under spinlock here to
681 : : * avoid race conditions with slot invalidation. See comments related
682 : : * to inactive_since in InvalidatePossiblyObsoleteSlot.
683 : : */
684 [ - + ]: 1370 : SpinLockAcquire(&s->mutex);
109 heikki.linnakangas@i 685 [ + + ]:GNC 1370 : if (s->active_proc == INVALID_PROC_NUMBER)
686 : 1211 : s->active_proc = MyProcNumber;
687 : 1370 : active_proc = s->active_proc;
465 akapila@postgresql.o 688 :CBC 1370 : ReplicationSlotSetInactiveSince(s, 0, false);
2171 fujii@postgresql.org 689 : 1370 : SpinLockRelease(&s->mutex);
690 : : }
691 : : else
692 : : {
109 heikki.linnakangas@i 693 :GNC 5 : s->active_proc = active_proc = MyProcNumber;
465 akapila@postgresql.o 694 :CBC 5 : ReplicationSlotSetInactiveSince(s, 0, true);
695 : : }
109 heikki.linnakangas@i 696 :GNC 1375 : active_pid = GetPGProcByNumber(active_proc)->pid;
2171 fujii@postgresql.org 697 :CBC 1375 : LWLockRelease(ReplicationSlotControlLock);
698 : :
699 : : /*
700 : : * If we found the slot but it's already active in another process, we
701 : : * wait until the owning process signals us that it's been released, or
702 : : * error out.
703 : : */
109 heikki.linnakangas@i 704 [ - + ]:GNC 1375 : if (active_proc != MyProcNumber)
705 : : {
1814 alvherre@alvh.no-ip. 706 [ # # ]:UBC 0 : if (!nowait)
707 : : {
708 : : /* Wait here until we get signaled, and then restart */
1811 709 : 0 : ConditionVariableSleep(&s->active_cv,
710 : : WAIT_EVENT_REPLICATION_SLOT_DROP);
711 : 0 : ConditionVariableCancelSleep();
712 : 0 : goto retry;
713 : : }
714 : :
715 [ # # ]: 0 : ereport(ERROR,
716 : : (errcode(ERRCODE_OBJECT_IN_USE),
717 : : errmsg("replication slot \"%s\" is active for PID %d",
718 : : NameStr(s->data.name), active_pid)));
719 : : }
1814 alvherre@alvh.no-ip. 720 [ + + ]:CBC 1375 : else if (!nowait)
1844 tgl@sss.pgh.pa.us 721 : 293 : ConditionVariableCancelSleep(); /* no sleep needed after all */
722 : :
723 : : /* We made this slot active, so it's ours now. */
2171 fujii@postgresql.org 724 : 1375 : MyReplicationSlot = s;
725 : :
726 : : /*
727 : : * We need to check for invalidation after making the slot ours to avoid
728 : : * the possible race condition with the checkpointer that can otherwise
729 : : * invalidate the slot immediately after the check.
730 : : */
457 akapila@postgresql.o 731 [ + + + + ]: 1375 : if (error_if_invalid && s->data.invalidated != RS_INVAL_NONE)
732 [ + - ]: 7 : ereport(ERROR,
733 : : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
734 : : errmsg("can no longer access replication slot \"%s\"",
735 : : NameStr(s->data.name)),
736 : : errdetail("This replication slot has been invalidated due to \"%s\".",
737 : : GetSlotInvalidationCauseName(s->data.invalidated)));
738 : :
739 : : /* Let everybody know we've modified this slot */
740 : 1368 : ConditionVariableBroadcast(&s->active_cv);
741 : :
742 : : /*
743 : : * The call to pgstat_acquire_replslot() protects against stats for a
744 : : * different slot, from before a restart or such, being present during
745 : : * pgstat_report_replslot().
746 : : */
1515 andres@anarazel.de 747 [ + + ]: 1368 : if (SlotIsLogical(s))
748 : 1139 : pgstat_acquire_replslot(s);
749 : :
750 : :
921 akapila@postgresql.o 751 [ + + ]: 1368 : if (am_walsender)
752 : : {
753 [ + - + - : 945 : ereport(log_replication_commands ? LOG : DEBUG1,
+ + ]
754 : : SlotIsLogical(s)
755 : : ? errmsg("acquired logical replication slot \"%s\"",
756 : : NameStr(s->data.name))
757 : : : errmsg("acquired physical replication slot \"%s\"",
758 : : NameStr(s->data.name)));
759 : : }
4502 rhaas@postgresql.org 760 : 1368 : }
761 : :
762 : : /*
763 : : * Release the replication slot that this backend considers to own.
764 : : *
765 : : * This or another backend can re-acquire the slot later.
766 : : * Resources this slot requires will be preserved.
767 : : */
768 : : void
769 : 1665 : ReplicationSlotRelease(void)
770 : : {
771 : 1665 : ReplicationSlot *slot = MyReplicationSlot;
921 akapila@postgresql.o 772 : 1665 : char *slotname = NULL; /* keep compiler quiet */
773 : : bool is_logical;
796 774 : 1665 : TimestampTz now = 0;
775 : :
109 heikki.linnakangas@i 776 [ + - - + ]:GNC 1665 : Assert(slot != NULL && slot->active_proc != INVALID_PROC_NUMBER);
777 : :
158 msawada@postgresql.o 778 : 1665 : is_logical = SlotIsLogical(slot);
779 : :
921 akapila@postgresql.o 780 [ + + ]:CBC 1665 : if (am_walsender)
781 : 1174 : slotname = pstrdup(NameStr(slot->data.name));
782 : :
4471 rhaas@postgresql.org 783 [ + + ]: 1665 : if (slot->data.persistency == RS_EPHEMERAL)
784 : : {
785 : : /*
786 : : * If slot is ephemeral, we drop it upon release, and request logical
787 : : * decoding be disabled.
788 : : */
3 alvherre@kurilemu.de 789 :GNC 6 : ReplicationSlotDropAcquired(is_logical);
790 : : }
791 : :
792 : : /*
793 : : * If slot needed to temporarily restrain both data and catalog xmin to
794 : : * create the catalog snapshot, remove that temporary constraint.
795 : : * Snapshots can only be exported while the initial snapshot is still
796 : : * acquired.
797 : : */
3324 andres@anarazel.de 798 [ + + ]:CBC 1665 : if (!TransactionIdIsValid(slot->data.xmin) &&
799 [ + + ]: 1632 : TransactionIdIsValid(slot->effective_xmin))
800 : : {
801 [ - + ]: 212 : SpinLockAcquire(&slot->mutex);
802 : 212 : slot->effective_xmin = InvalidTransactionId;
803 : 212 : SpinLockRelease(&slot->mutex);
804 : 212 : ReplicationSlotsComputeRequiredXmin(false);
805 : : }
806 : :
807 : : /*
808 : : * Set the time since the slot has become inactive. We get the current
809 : : * time beforehand to avoid system call while holding the spinlock.
810 : : */
785 akapila@postgresql.o 811 : 1665 : now = GetCurrentTimestamp();
812 : :
3231 alvherre@alvh.no-ip. 813 [ + + ]: 1665 : if (slot->data.persistency == RS_PERSISTENT)
814 : : {
815 : : /*
816 : : * Mark persistent slot inactive. We're not freeing it, just
817 : : * disconnecting, but wake up others that may be waiting for it.
818 : : */
819 [ - + ]: 1340 : SpinLockAcquire(&slot->mutex);
109 heikki.linnakangas@i 820 :GNC 1340 : slot->active_proc = INVALID_PROC_NUMBER;
479 akapila@postgresql.o 821 :CBC 1340 : ReplicationSlotSetInactiveSince(slot, now, false);
3231 alvherre@alvh.no-ip. 822 : 1340 : SpinLockRelease(&slot->mutex);
823 : 1340 : ConditionVariableBroadcast(&slot->active_cv);
824 : : }
825 : : else
479 akapila@postgresql.o 826 : 325 : ReplicationSlotSetInactiveSince(slot, now, true);
827 : :
4471 rhaas@postgresql.org 828 : 1665 : MyReplicationSlot = NULL;
829 : :
830 : : /* might not have been set when we've been a plain slot */
1661 alvherre@alvh.no-ip. 831 : 1665 : LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
2021 832 : 1665 : MyProc->statusFlags &= ~PROC_IN_LOGICAL_DECODING;
833 : 1665 : ProcGlobal->statusFlags[MyProc->pgxactoff] = MyProc->statusFlags;
4471 rhaas@postgresql.org 834 : 1665 : LWLockRelease(ProcArrayLock);
835 : :
921 akapila@postgresql.o 836 [ + + ]: 1665 : if (am_walsender)
837 : : {
838 [ + - + - : 1174 : ereport(log_replication_commands ? LOG : DEBUG1,
+ + ]
839 : : is_logical
840 : : ? errmsg("released logical replication slot \"%s\"",
841 : : slotname)
842 : : : errmsg("released physical replication slot \"%s\"",
843 : : slotname));
844 : :
845 : 1174 : pfree(slotname);
846 : : }
4502 rhaas@postgresql.org 847 : 1665 : }
848 : :
849 : : /*
850 : : * Cleanup temporary slots created in current session.
851 : : *
852 : : * Cleanup only synced temporary slots if 'synced_only' is true, else
853 : : * cleanup all temporary slots.
854 : : *
855 : : * If it drops the last logical slot in the cluster, requests to disable
856 : : * logical decoding.
857 : : */
858 : : void
765 akapila@postgresql.o 859 : 54362 : ReplicationSlotCleanup(bool synced_only)
860 : : {
861 : : int i;
862 : : bool found_valid_logicalslot;
158 msawada@postgresql.o 863 :GNC 54362 : bool dropped_logical = false;
864 : :
3460 peter_e@gmx.net 865 [ + - ]:CBC 54362 : Assert(MyReplicationSlot == NULL);
866 : :
3231 alvherre@alvh.no-ip. 867 : 54362 : restart:
158 msawada@postgresql.o 868 :GNC 54522 : found_valid_logicalslot = false;
3231 alvherre@alvh.no-ip. 869 :CBC 54522 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
53 alvherre@kurilemu.de 870 [ + + ]:GNC 864087 : for (i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
871 : : {
3460 peter_e@gmx.net 872 :CBC 809725 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
873 : :
3231 alvherre@alvh.no-ip. 874 [ + + ]: 809725 : if (!s->in_use)
875 : 794302 : continue;
876 : :
877 [ + + ]: 15423 : SpinLockAcquire(&s->mutex);
878 : :
158 msawada@postgresql.o 879 :GNC 30846 : found_valid_logicalslot |=
880 [ + + + + ]: 15423 : (SlotIsLogical(s) && s->data.invalidated == RS_INVAL_NONE);
881 : :
109 heikki.linnakangas@i 882 [ + + ]: 15423 : if ((s->active_proc == MyProcNumber &&
765 akapila@postgresql.o 883 [ - + - - ]:CBC 160 : (!synced_only || s->data.synced)))
884 : : {
3231 alvherre@alvh.no-ip. 885 [ - + ]: 160 : Assert(s->data.persistency == RS_TEMPORARY);
886 : 160 : SpinLockRelease(&s->mutex);
887 : 160 : LWLockRelease(ReplicationSlotControlLock); /* avoid deadlock */
888 : :
158 msawada@postgresql.o 889 [ + + ]:GNC 160 : if (SlotIsLogical(s))
890 : 11 : dropped_logical = true;
891 : :
3460 peter_e@gmx.net 892 :CBC 160 : ReplicationSlotDropPtr(s);
893 : :
3231 alvherre@alvh.no-ip. 894 : 160 : ConditionVariableBroadcast(&s->active_cv);
895 : 160 : goto restart;
896 : : }
897 : : else
898 : 15263 : SpinLockRelease(&s->mutex);
899 : : }
900 : :
901 : 54362 : LWLockRelease(ReplicationSlotControlLock);
902 : :
158 msawada@postgresql.o 903 [ + + + + ]:GNC 54362 : if (dropped_logical && !found_valid_logicalslot)
904 : 4 : RequestDisableLogicalDecoding();
3460 peter_e@gmx.net 905 :CBC 54362 : }
906 : :
907 : : /*
908 : : * Permanently drop the replication slot identified by the passed-in name.
909 : : *
910 : : * If this is a logical slot, request that logical decoding be disabled.
911 : : */
912 : : void
3231 alvherre@alvh.no-ip. 913 : 441 : ReplicationSlotDrop(const char *name, bool nowait)
914 : : {
484 akapila@postgresql.o 915 : 441 : ReplicationSlotAcquire(name, nowait, false);
916 : :
917 : : /*
918 : : * Do not allow users to drop the slots which are currently being synced
919 : : * from the primary to the standby.
920 : : */
836 921 [ + + + + ]: 433 : if (RecoveryInProgress() && MyReplicationSlot->data.synced)
922 [ + - ]: 1 : ereport(ERROR,
923 : : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
924 : : errmsg("cannot drop replication slot \"%s\"", name),
925 : : errdetail("This replication slot is being synchronized from the primary server."));
926 : :
3 alvherre@kurilemu.de 927 :GNC 432 : ReplicationSlotDropAcquired(SlotIsLogical(MyReplicationSlot));
4471 rhaas@postgresql.org 928 :CBC 432 : }
929 : :
930 : : /*
931 : : * Change the definition of the slot identified by the specified name.
932 : : *
933 : : * Altering the two_phase property of a slot requires caution on the
934 : : * client-side. Enabling it at any random point during decoding has the
935 : : * risk that transactions prepared before this change may be skipped by
936 : : * the decoder, leading to missing prepare records on the client. So, we
937 : : * enable it for subscription related slots only once the initial tablesync
938 : : * is finished. See comments atop worker.c. Disabling it is safe only when
939 : : * there are no pending prepared transaction, otherwise, the changes of
940 : : * already prepared transactions can be replicated again along with their
941 : : * corresponding commit leading to duplicate data or errors.
942 : : */
943 : : void
675 akapila@postgresql.o 944 : 7 : ReplicationSlotAlter(const char *name, const bool *failover,
945 : : const bool *two_phase)
946 : : {
947 : 7 : bool update_slot = false;
948 : :
852 949 [ - + ]: 7 : Assert(MyReplicationSlot == NULL);
675 950 [ + + - + ]: 7 : Assert(failover || two_phase);
951 : :
484 952 : 7 : ReplicationSlotAcquire(name, false, true);
953 : :
852 954 [ - + ]: 6 : if (SlotIsPhysical(MyReplicationSlot))
852 akapila@postgresql.o 955 [ # # ]:UBC 0 : ereport(ERROR,
956 : : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
957 : : errmsg("cannot use %s with a physical replication slot",
958 : : "ALTER_REPLICATION_SLOT"));
959 : :
836 akapila@postgresql.o 960 [ + + ]:CBC 6 : if (RecoveryInProgress())
961 : : {
962 : : /*
963 : : * Do not allow users to alter the slots which are currently being
964 : : * synced from the primary to the standby.
965 : : */
966 [ + - ]: 1 : if (MyReplicationSlot->data.synced)
967 [ + - ]: 1 : ereport(ERROR,
968 : : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
969 : : errmsg("cannot alter replication slot \"%s\"", name),
970 : : errdetail("This replication slot is being synchronized from the primary server."));
971 : :
972 : : /*
973 : : * Do not allow users to enable failover on the standby as we do not
974 : : * support sync to the cascading standby.
975 : : */
675 akapila@postgresql.o 976 [ # # # # ]:UBC 0 : if (failover && *failover)
836 977 [ # # ]: 0 : ereport(ERROR,
978 : : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
979 : : errmsg("cannot enable failover for a replication slot"
980 : : " on the standby"));
981 : : }
982 : :
675 akapila@postgresql.o 983 [ + + ]:CBC 5 : if (failover)
984 : : {
985 : : /*
986 : : * Do not allow users to enable failover for temporary slots as we do
987 : : * not support syncing temporary slots to the standby.
988 : : */
989 [ + + - + ]: 4 : if (*failover && MyReplicationSlot->data.persistency == RS_TEMPORARY)
675 akapila@postgresql.o 990 [ # # ]:UBC 0 : ereport(ERROR,
991 : : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
992 : : errmsg("cannot enable failover for a temporary replication slot"));
993 : :
675 akapila@postgresql.o 994 [ + - ]:CBC 4 : if (MyReplicationSlot->data.failover != *failover)
995 : : {
996 [ - + ]: 4 : SpinLockAcquire(&MyReplicationSlot->mutex);
997 : 4 : MyReplicationSlot->data.failover = *failover;
998 : 4 : SpinLockRelease(&MyReplicationSlot->mutex);
999 : :
1000 : 4 : update_slot = true;
1001 : : }
1002 : : }
1003 : :
1004 [ + + + - ]: 5 : if (two_phase && MyReplicationSlot->data.two_phase != *two_phase)
1005 : : {
843 1006 [ - + ]: 1 : SpinLockAcquire(&MyReplicationSlot->mutex);
675 1007 : 1 : MyReplicationSlot->data.two_phase = *two_phase;
843 1008 : 1 : SpinLockRelease(&MyReplicationSlot->mutex);
1009 : :
675 1010 : 1 : update_slot = true;
1011 : : }
1012 : :
1013 [ + - ]: 5 : if (update_slot)
1014 : : {
843 1015 : 5 : ReplicationSlotMarkDirty();
1016 : 5 : ReplicationSlotSave();
1017 : : }
1018 : :
852 1019 : 5 : ReplicationSlotRelease();
1020 : 5 : }
1021 : :
1022 : : /*
1023 : : * Permanently drop the currently acquired replication slot.
1024 : : *
1025 : : * If caller requests it, have checkpointer attempt to disable logical
1026 : : * decoding. Obviously, this should only be done if the slot is logical.
1027 : : */
1028 : : void
3 alvherre@kurilemu.de 1029 :GNC 453 : ReplicationSlotDropAcquired(bool try_disable)
1030 : : {
1031 : : ReplicationSlot *slot;
1032 : :
4471 rhaas@postgresql.org 1033 [ - + ]:CBC 453 : Assert(MyReplicationSlot != NULL);
3 alvherre@kurilemu.de 1034 :GNC 453 : slot = MyReplicationSlot;
1035 : :
1036 : : /* Can only disable logical decoding if slot is logical */
1037 [ + + - + ]: 453 : Assert(!try_disable || SlotIsLogical(slot));
1038 : :
1039 : : /* slot isn't acquired anymore */
4471 rhaas@postgresql.org 1040 :CBC 453 : MyReplicationSlot = NULL;
1041 : :
3460 peter_e@gmx.net 1042 : 453 : ReplicationSlotDropPtr(slot);
1043 : :
3 alvherre@kurilemu.de 1044 [ + + ]:GNC 453 : if (try_disable)
1045 : 425 : RequestDisableLogicalDecoding();
3460 peter_e@gmx.net 1046 :CBC 453 : }
1047 : :
1048 : : /*
1049 : : * Permanently drop the replication slot which will be released by the point
1050 : : * this function returns.
1051 : : */
1052 : : static void
1053 : 613 : ReplicationSlotDropPtr(ReplicationSlot *slot)
1054 : : {
1055 : : char path[MAXPGPATH];
1056 : : char tmppath[MAXPGPATH];
1057 : :
1058 : : /*
1059 : : * If some other backend ran this code concurrently with us, we might try
1060 : : * to delete a slot with a certain name while someone else was trying to
1061 : : * create a slot with the same name.
1062 : : */
4502 rhaas@postgresql.org 1063 : 613 : LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
1064 : :
1065 : : /* Generate pathnames. */
638 michael@paquier.xyz 1066 : 613 : sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(slot->data.name));
1067 : 613 : sprintf(tmppath, "%s/%s.tmp", PG_REPLSLOT_DIR, NameStr(slot->data.name));
1068 : :
1069 : : /*
1070 : : * Rename the slot directory on disk, so that we'll no longer recognize
1071 : : * this as a valid slot. Note that if this fails, we've got to mark the
1072 : : * slot inactive before bailing out. If we're dropping an ephemeral or a
1073 : : * temporary slot, we better never fail hard as the caller won't expect
1074 : : * the slot to survive and this might get called during error handling.
1075 : : */
4471 rhaas@postgresql.org 1076 [ + - ]: 613 : if (rename(path, tmppath) == 0)
1077 : : {
1078 : : /*
1079 : : * We need to fsync() the directory we just renamed and its parent to
1080 : : * make sure that our changes are on disk in a crash-safe fashion. If
1081 : : * fsync() fails, we can't be sure whether the changes are on disk or
1082 : : * not. For now, we handle that by panicking;
1083 : : * StartupReplicationSlots() will try to straighten it out after
1084 : : * restart.
1085 : : */
1086 : 613 : START_CRIT_SECTION();
1087 : 613 : fsync_fname(tmppath, true);
638 michael@paquier.xyz 1088 : 613 : fsync_fname(PG_REPLSLOT_DIR, true);
4471 rhaas@postgresql.org 1089 [ - + ]: 613 : END_CRIT_SECTION();
1090 : : }
1091 : : else
1092 : : {
3460 peter_e@gmx.net 1093 :UBC 0 : bool fail_softly = slot->data.persistency != RS_PERSISTENT;
1094 : :
4502 rhaas@postgresql.org 1095 [ # # ]: 0 : SpinLockAcquire(&slot->mutex);
109 heikki.linnakangas@i 1096 :UNC 0 : slot->active_proc = INVALID_PROC_NUMBER;
4502 rhaas@postgresql.org 1097 :UBC 0 : SpinLockRelease(&slot->mutex);
1098 : :
1099 : : /* wake up anyone waiting on this slot */
3231 alvherre@alvh.no-ip. 1100 : 0 : ConditionVariableBroadcast(&slot->active_cv);
1101 : :
4471 rhaas@postgresql.org 1102 [ # # # # ]: 0 : ereport(fail_softly ? WARNING : ERROR,
1103 : : (errcode_for_file_access(),
1104 : : errmsg("could not rename file \"%s\" to \"%s\": %m",
1105 : : path, tmppath)));
1106 : : }
1107 : :
1108 : : /*
1109 : : * The slot is definitely gone. Lock out concurrent scans of the array
1110 : : * long enough to kill it. It's OK to clear the active PID here without
1111 : : * grabbing the mutex because nobody else can be scanning the array here,
1112 : : * and nobody can be attached to this slot and thus access it without
1113 : : * scanning the array.
1114 : : *
1115 : : * Also wake up processes waiting for it.
1116 : : */
4502 rhaas@postgresql.org 1117 :CBC 613 : LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
109 heikki.linnakangas@i 1118 :GNC 613 : slot->active_proc = INVALID_PROC_NUMBER;
4502 rhaas@postgresql.org 1119 :CBC 613 : slot->in_use = false;
1120 : 613 : LWLockRelease(ReplicationSlotControlLock);
3231 alvherre@alvh.no-ip. 1121 : 613 : ConditionVariableBroadcast(&slot->active_cv);
1122 : :
1123 : : /*
1124 : : * Slot is dead and doesn't prevent resource removal anymore, recompute
1125 : : * limits.
1126 : : */
4471 rhaas@postgresql.org 1127 : 613 : ReplicationSlotsComputeRequiredXmin(false);
4502 1128 : 613 : ReplicationSlotsComputeRequiredLSN();
1129 : :
1130 : : /*
1131 : : * If removing the directory fails, the worst thing that will happen is
1132 : : * that the user won't be able to create a new slot with the same name
1133 : : * until the next server restart. We warn about it, but that's all.
1134 : : */
1135 [ - + ]: 613 : if (!rmtree(tmppath, true))
4502 rhaas@postgresql.org 1136 [ # # ]:UBC 0 : ereport(WARNING,
1137 : : (errmsg("could not remove directory \"%s\"", tmppath)));
1138 : :
1139 : : /*
1140 : : * Drop the statistics entry for the replication slot. Do this while
1141 : : * holding ReplicationSlotAllocationLock so that we don't drop a
1142 : : * statistics entry for another slot with the same name just created in
1143 : : * another session.
1144 : : */
2060 akapila@postgresql.o 1145 [ + + ]:CBC 613 : if (SlotIsLogical(slot))
1515 andres@anarazel.de 1146 : 443 : pgstat_drop_replslot(slot);
1147 : :
1148 : : /*
1149 : : * We release this at the very end, so that nobody starts trying to create
1150 : : * a slot while we're still cleaning up the detritus of the old one.
1151 : : */
4502 rhaas@postgresql.org 1152 : 613 : LWLockRelease(ReplicationSlotAllocationLock);
1153 : 613 : }
1154 : :
1155 : : /*
1156 : : * Serialize the currently acquired slot's state from memory to disk, thereby
1157 : : * guaranteeing the current state will survive a crash.
1158 : : */
1159 : : void
1160 : 1498 : ReplicationSlotSave(void)
1161 : : {
1162 : : char path[MAXPGPATH];
1163 : :
1164 [ - + ]: 1498 : Assert(MyReplicationSlot != NULL);
1165 : :
638 michael@paquier.xyz 1166 : 1498 : sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(MyReplicationSlot->data.name));
4502 rhaas@postgresql.org 1167 : 1498 : SaveSlotToPath(MyReplicationSlot, path, ERROR);
1168 : 1498 : }
1169 : :
1170 : : /*
1171 : : * Signal that it would be useful if the currently acquired slot would be
1172 : : * flushed out to disk.
1173 : : *
1174 : : * Note that the actual flush to disk can be delayed for a long time, if
1175 : : * required for correctness explicitly do a ReplicationSlotSave().
1176 : : */
1177 : : void
1178 : 72352 : ReplicationSlotMarkDirty(void)
1179 : : {
3889 1180 : 72352 : ReplicationSlot *slot = MyReplicationSlot;
1181 : :
4502 1182 [ - + ]: 72352 : Assert(MyReplicationSlot != NULL);
1183 : :
3889 1184 [ - + ]: 72352 : SpinLockAcquire(&slot->mutex);
1185 : 72352 : MyReplicationSlot->just_dirtied = true;
1186 : 72352 : MyReplicationSlot->dirty = true;
1187 : 72352 : SpinLockRelease(&slot->mutex);
4502 1188 : 72352 : }
1189 : :
1190 : : /*
1191 : : * Convert a slot that's marked as RS_EPHEMERAL or RS_TEMPORARY to a
1192 : : * RS_PERSISTENT slot, guaranteeing it will be there after an eventual crash.
1193 : : */
1194 : : void
4471 1195 : 489 : ReplicationSlotPersist(void)
1196 : : {
1197 : 489 : ReplicationSlot *slot = MyReplicationSlot;
1198 : :
1199 [ - + ]: 489 : Assert(slot != NULL);
1200 [ - + ]: 489 : Assert(slot->data.persistency != RS_PERSISTENT);
1201 : :
3889 1202 [ - + ]: 489 : SpinLockAcquire(&slot->mutex);
1203 : 489 : slot->data.persistency = RS_PERSISTENT;
1204 : 489 : SpinLockRelease(&slot->mutex);
1205 : :
4471 1206 : 489 : ReplicationSlotMarkDirty();
1207 : 489 : ReplicationSlotSave();
1208 : 489 : }
1209 : :
1210 : : /*
1211 : : * Compute the oldest xmin across all slots and store it in the ProcArray.
1212 : : *
1213 : : * If already_locked is true, both the ReplicationSlotControlLock and the
1214 : : * ProcArrayLock have already been acquired exclusively. It is crucial that the
1215 : : * caller first acquires the ReplicationSlotControlLock, followed by the
1216 : : * ProcArrayLock, to prevent any undetectable deadlocks since this function
1217 : : * acquires them in that order.
1218 : : */
1219 : : void
1220 : 2620 : ReplicationSlotsComputeRequiredXmin(bool already_locked)
1221 : : {
1222 : : int i;
4502 1223 : 2620 : TransactionId agg_xmin = InvalidTransactionId;
4471 1224 : 2620 : TransactionId agg_catalog_xmin = InvalidTransactionId;
1225 : :
4502 1226 [ - + ]: 2620 : Assert(ReplicationSlotCtl != NULL);
151 msawada@postgresql.o 1227 [ + + + - : 2620 : Assert(!already_locked ||
- + ]
1228 : : (LWLockHeldByMeInMode(ReplicationSlotControlLock, LW_EXCLUSIVE) &&
1229 : : LWLockHeldByMeInMode(ProcArrayLock, LW_EXCLUSIVE)));
1230 : :
1231 : : /*
1232 : : * Hold the ReplicationSlotControlLock until after updating the slot xmin
1233 : : * values, so no backend updates the initial xmin for newly created slot
1234 : : * concurrently. A shared lock is used here to minimize lock contention,
1235 : : * especially when many slots exist and advancements occur frequently.
1236 : : * This is safe since an exclusive lock is taken during initial slot xmin
1237 : : * update in slot creation.
1238 : : *
1239 : : * One might think that we can hold the ProcArrayLock exclusively and
1240 : : * update the slot xmin values, but it could increase lock contention on
1241 : : * the ProcArrayLock, which is not great since this function can be called
1242 : : * at non-negligible frequency.
1243 : : *
1244 : : * Concurrent invocation of this function may cause the computed slot xmin
1245 : : * to regress. However, this is harmless because tuples prior to the most
1246 : : * recent xmin are no longer useful once advancement occurs (see
1247 : : * LogicalConfirmReceivedLocation where the slot's xmin value is flushed
1248 : : * before updating the effective_xmin). Thus, such regression merely
1249 : : * prevents VACUUM from prematurely removing tuples without causing the
1250 : : * early deletion of required data.
1251 : : */
1252 [ + + ]: 2620 : if (!already_locked)
1253 : 2106 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1254 : :
53 alvherre@kurilemu.de 1255 [ + + ]:GNC 39811 : for (i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
1256 : : {
4502 rhaas@postgresql.org 1257 :CBC 37191 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
1258 : : TransactionId effective_xmin;
1259 : : TransactionId effective_catalog_xmin;
1260 : : bool invalidated;
1261 : :
1262 [ + + ]: 37191 : if (!s->in_use)
1263 : 34678 : continue;
1264 : :
3889 1265 [ - + ]: 2513 : SpinLockAcquire(&s->mutex);
1266 : 2513 : effective_xmin = s->effective_xmin;
1267 : 2513 : effective_catalog_xmin = s->effective_catalog_xmin;
1149 andres@anarazel.de 1268 : 2513 : invalidated = s->data.invalidated != RS_INVAL_NONE;
3889 rhaas@postgresql.org 1269 : 2513 : SpinLockRelease(&s->mutex);
1270 : :
1271 : : /* invalidated slots need not apply */
1285 alvherre@alvh.no-ip. 1272 [ + + ]: 2513 : if (invalidated)
1273 : 26 : continue;
1274 : :
1275 : : /* check the data xmin */
4502 rhaas@postgresql.org 1276 [ + + + + ]: 2487 : if (TransactionIdIsValid(effective_xmin) &&
1277 [ - + ]: 6 : (!TransactionIdIsValid(agg_xmin) ||
1278 : 6 : TransactionIdPrecedes(effective_xmin, agg_xmin)))
1279 : 336 : agg_xmin = effective_xmin;
1280 : :
1281 : : /* check the catalog xmin */
4471 1282 [ + + + + ]: 2487 : if (TransactionIdIsValid(effective_catalog_xmin) &&
1283 [ + + ]: 1060 : (!TransactionIdIsValid(agg_catalog_xmin) ||
1284 : 1060 : TransactionIdPrecedes(effective_catalog_xmin, agg_catalog_xmin)))
1285 : 1261 : agg_catalog_xmin = effective_catalog_xmin;
1286 : : }
1287 : :
1288 : 2620 : ProcArraySetReplicationSlotXmin(agg_xmin, agg_catalog_xmin, already_locked);
1289 : :
151 msawada@postgresql.o 1290 [ + + ]: 2620 : if (!already_locked)
1291 : 2106 : LWLockRelease(ReplicationSlotControlLock);
4502 rhaas@postgresql.org 1292 : 2620 : }
1293 : :
1294 : : /*
1295 : : * Compute the oldest restart LSN across all slots and inform xlog module.
1296 : : *
1297 : : * Note: while max_slot_wal_keep_size is theoretically relevant for this
1298 : : * purpose, we don't try to account for that, because this module doesn't
1299 : : * know what to compare against.
1300 : : */
1301 : : void
1302 : 73421 : ReplicationSlotsComputeRequiredLSN(void)
1303 : : {
1304 : : int i;
4407 bruce@momjian.us 1305 : 73421 : XLogRecPtr min_required = InvalidXLogRecPtr;
1306 : :
4502 rhaas@postgresql.org 1307 [ - + ]: 73421 : Assert(ReplicationSlotCtl != NULL);
1308 : :
1309 : 73421 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
53 alvherre@kurilemu.de 1310 [ + + ]:GNC 1170176 : for (i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
1311 : : {
4502 rhaas@postgresql.org 1312 :CBC 1096755 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
1313 : : XLogRecPtr restart_lsn;
1314 : : XLogRecPtr last_saved_restart_lsn;
1315 : : bool invalidated;
1316 : : ReplicationSlotPersistency persistency;
1317 : :
1318 [ + + ]: 1096755 : if (!s->in_use)
1319 : 1022876 : continue;
1320 : :
3889 1321 [ - + ]: 73879 : SpinLockAcquire(&s->mutex);
350 akorotkov@postgresql 1322 : 73879 : persistency = s->data.persistency;
3889 rhaas@postgresql.org 1323 : 73879 : restart_lsn = s->data.restart_lsn;
1149 andres@anarazel.de 1324 : 73879 : invalidated = s->data.invalidated != RS_INVAL_NONE;
350 akorotkov@postgresql 1325 : 73879 : last_saved_restart_lsn = s->last_saved_restart_lsn;
3889 rhaas@postgresql.org 1326 : 73879 : SpinLockRelease(&s->mutex);
1327 : :
1328 : : /* invalidated slots need not apply */
1149 andres@anarazel.de 1329 [ + + ]: 73879 : if (invalidated)
1330 : 27 : continue;
1331 : :
1332 : : /*
1333 : : * For persistent slot use last_saved_restart_lsn to compute the
1334 : : * oldest LSN for removal of WAL segments. The segments between
1335 : : * last_saved_restart_lsn and restart_lsn might be needed by a
1336 : : * persistent slot in the case of database crash. Non-persistent
1337 : : * slots can't survive the database crash, so we don't care about
1338 : : * last_saved_restart_lsn for them.
1339 : : */
350 akorotkov@postgresql 1340 [ + + ]: 73852 : if (persistency == RS_PERSISTENT)
1341 : : {
205 alvherre@kurilemu.de 1342 [ + + + + ]:GNC 72854 : if (XLogRecPtrIsValid(last_saved_restart_lsn) &&
1343 : : restart_lsn > last_saved_restart_lsn)
1344 : : {
350 akorotkov@postgresql 1345 :CBC 69577 : restart_lsn = last_saved_restart_lsn;
1346 : : }
1347 : : }
1348 : :
205 alvherre@kurilemu.de 1349 [ + + + + ]:GNC 73852 : if (XLogRecPtrIsValid(restart_lsn) &&
1350 [ + + ]: 1649 : (!XLogRecPtrIsValid(min_required) ||
1351 : : restart_lsn < min_required))
4502 rhaas@postgresql.org 1352 :CBC 72363 : min_required = restart_lsn;
1353 : : }
1354 : 73421 : LWLockRelease(ReplicationSlotControlLock);
1355 : :
1356 : 73421 : XLogSetReplicationSlotMinimumLSN(min_required);
1357 : 73421 : }
1358 : :
1359 : : /*
1360 : : * Compute the oldest WAL LSN required by *logical* decoding slots..
1361 : : *
1362 : : * Returns InvalidXLogRecPtr if logical decoding is disabled or no logical
1363 : : * slots exist.
1364 : : *
1365 : : * NB: this returns a value >= ReplicationSlotsComputeRequiredLSN(), since it
1366 : : * ignores physical replication slots.
1367 : : *
1368 : : * The results aren't required frequently, so we don't maintain a precomputed
1369 : : * value like we do for ComputeRequiredLSN() and ComputeRequiredXmin().
1370 : : */
1371 : : XLogRecPtr
4471 1372 : 3900 : ReplicationSlotsComputeLogicalRestartLSN(void)
1373 : : {
1374 : 3900 : XLogRecPtr result = InvalidXLogRecPtr;
1375 : : int i;
1376 : :
53 alvherre@kurilemu.de 1377 [ - + ]:GNC 3900 : if (max_replication_slots + max_repack_replication_slots <= 0)
4471 rhaas@postgresql.org 1378 :UBC 0 : return InvalidXLogRecPtr;
1379 : :
4471 rhaas@postgresql.org 1380 :CBC 3900 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1381 : :
53 alvherre@kurilemu.de 1382 [ + + ]:GNC 61838 : for (i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
1383 : : {
1384 : : ReplicationSlot *s;
1385 : : XLogRecPtr restart_lsn;
1386 : : XLogRecPtr last_saved_restart_lsn;
1387 : : bool invalidated;
1388 : : ReplicationSlotPersistency persistency;
1389 : :
4471 rhaas@postgresql.org 1390 :CBC 57938 : s = &ReplicationSlotCtl->replication_slots[i];
1391 : :
1392 : : /* cannot change while ReplicationSlotCtlLock is held */
1393 [ + + ]: 57938 : if (!s->in_use)
1394 : 57112 : continue;
1395 : :
1396 : : /* we're only interested in logical slots */
3945 andres@anarazel.de 1397 [ + + ]: 826 : if (!SlotIsLogical(s))
4471 rhaas@postgresql.org 1398 : 566 : continue;
1399 : :
1400 : : /* read once, it's ok if it increases while we're checking */
1401 [ - + ]: 260 : SpinLockAcquire(&s->mutex);
350 akorotkov@postgresql 1402 : 260 : persistency = s->data.persistency;
4471 rhaas@postgresql.org 1403 : 260 : restart_lsn = s->data.restart_lsn;
1149 andres@anarazel.de 1404 : 260 : invalidated = s->data.invalidated != RS_INVAL_NONE;
350 akorotkov@postgresql 1405 : 260 : last_saved_restart_lsn = s->last_saved_restart_lsn;
4471 rhaas@postgresql.org 1406 : 260 : SpinLockRelease(&s->mutex);
1407 : :
1408 : : /* invalidated slots need not apply */
1149 andres@anarazel.de 1409 [ + + ]: 260 : if (invalidated)
1410 : 10 : continue;
1411 : :
1412 : : /*
1413 : : * For persistent slot use last_saved_restart_lsn to compute the
1414 : : * oldest LSN for removal of WAL segments. The segments between
1415 : : * last_saved_restart_lsn and restart_lsn might be needed by a
1416 : : * persistent slot in the case of database crash. Non-persistent
1417 : : * slots can't survive the database crash, so we don't care about
1418 : : * last_saved_restart_lsn for them.
1419 : : */
350 akorotkov@postgresql 1420 [ + + ]: 250 : if (persistency == RS_PERSISTENT)
1421 : : {
205 alvherre@kurilemu.de 1422 [ + - - + ]:GNC 248 : if (XLogRecPtrIsValid(last_saved_restart_lsn) &&
1423 : : restart_lsn > last_saved_restart_lsn)
1424 : : {
350 akorotkov@postgresql 1425 :UBC 0 : restart_lsn = last_saved_restart_lsn;
1426 : : }
1427 : : }
1428 : :
205 alvherre@kurilemu.de 1429 [ - + ]:GNC 250 : if (!XLogRecPtrIsValid(restart_lsn))
2244 alvherre@alvh.no-ip. 1430 :UBC 0 : continue;
1431 : :
205 alvherre@kurilemu.de 1432 [ + + + + ]:GNC 250 : if (!XLogRecPtrIsValid(result) ||
1433 : : restart_lsn < result)
4471 rhaas@postgresql.org 1434 :CBC 192 : result = restart_lsn;
1435 : : }
1436 : :
1437 : 3900 : LWLockRelease(ReplicationSlotControlLock);
1438 : :
1439 : 3900 : return result;
1440 : : }
1441 : :
1442 : : /*
1443 : : * ReplicationSlotsCountDBSlots -- count the number of slots that refer to the
1444 : : * passed database oid.
1445 : : *
1446 : : * Returns true if there are any slots referencing the database. *nslots will
1447 : : * be set to the absolute number of slots in the database, *nactive to ones
1448 : : * currently active.
1449 : : */
1450 : : bool
1451 : 51 : ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
1452 : : {
1453 : : int i;
1454 : :
1455 : 51 : *nslots = *nactive = 0;
1456 : :
53 alvherre@kurilemu.de 1457 [ - + ]:GNC 51 : if (max_replication_slots + max_repack_replication_slots <= 0)
4471 rhaas@postgresql.org 1458 :UBC 0 : return false;
1459 : :
4471 rhaas@postgresql.org 1460 :CBC 51 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
53 alvherre@kurilemu.de 1461 [ + + ]:GNC 789 : for (i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
1462 : : {
1463 : : ReplicationSlot *s;
1464 : :
4471 rhaas@postgresql.org 1465 :CBC 738 : s = &ReplicationSlotCtl->replication_slots[i];
1466 : :
1467 : : /* cannot change while ReplicationSlotCtlLock is held */
1468 [ + + ]: 738 : if (!s->in_use)
1469 : 717 : continue;
1470 : :
1471 : : /* only logical slots are database specific, skip */
3945 andres@anarazel.de 1472 [ + + ]: 21 : if (!SlotIsLogical(s))
4467 bruce@momjian.us 1473 : 10 : continue;
1474 : :
1475 : : /* not our database, skip */
4471 rhaas@postgresql.org 1476 [ + + ]: 11 : if (s->data.database != dboid)
1477 : 8 : continue;
1478 : :
1479 : : /* NB: intentionally counting invalidated slots */
1480 : :
1481 : : /* count slots with spinlock held */
1482 [ - + ]: 3 : SpinLockAcquire(&s->mutex);
1483 : 3 : (*nslots)++;
109 heikki.linnakangas@i 1484 [ + + ]:GNC 3 : if (s->active_proc != INVALID_PROC_NUMBER)
4471 rhaas@postgresql.org 1485 :CBC 1 : (*nactive)++;
1486 : 3 : SpinLockRelease(&s->mutex);
1487 : : }
1488 : 51 : LWLockRelease(ReplicationSlotControlLock);
1489 : :
1490 [ + + ]: 51 : if (*nslots > 0)
1491 : 3 : return true;
1492 : 48 : return false;
1493 : : }
1494 : :
1495 : : /*
1496 : : * ReplicationSlotsDropDBSlots -- Drop all db-specific slots relating to the
1497 : : * passed database oid. The caller should hold an exclusive lock on the
1498 : : * pg_database oid for the database to prevent creation of new slots on the db
1499 : : * or replay from existing slots.
1500 : : *
1501 : : * Another session that concurrently acquires an existing slot on the target DB
1502 : : * (most likely to drop it) may cause this function to ERROR. If that happens
1503 : : * it may have dropped some but not all slots.
1504 : : *
1505 : : * This routine isn't as efficient as it could be - but we don't drop
1506 : : * databases often, especially databases with lots of slots.
1507 : : *
1508 : : * If the last logical slot in the cluster is dropped, request to disable
1509 : : * logical decoding.
1510 : : */
1511 : : void
3350 simon@2ndQuadrant.co 1512 : 64 : ReplicationSlotsDropDBSlots(Oid dboid)
1513 : : {
1514 : : int i;
1515 : : bool found_valid_logicalslot;
158 msawada@postgresql.o 1516 :GNC 64 : bool dropped = false;
1517 : :
53 alvherre@kurilemu.de 1518 [ + - ]: 64 : if (max_replication_slots + max_repack_replication_slots <= 0)
3350 simon@2ndQuadrant.co 1519 :UBC 0 : return;
1520 : :
3350 simon@2ndQuadrant.co 1521 :CBC 64 : restart:
158 msawada@postgresql.o 1522 :GNC 69 : found_valid_logicalslot = false;
3350 simon@2ndQuadrant.co 1523 :CBC 69 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
53 alvherre@kurilemu.de 1524 [ + + ]:GNC 992 : for (i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
1525 : : {
1526 : : ReplicationSlot *s;
1527 : : char *slotname;
1528 : : ProcNumber active_proc;
1529 : :
3350 simon@2ndQuadrant.co 1530 :CBC 928 : s = &ReplicationSlotCtl->replication_slots[i];
1531 : :
1532 : : /* cannot change while ReplicationSlotCtlLock is held */
1533 [ + + ]: 928 : if (!s->in_use)
1534 : 900 : continue;
1535 : :
1536 : : /* only logical slots are database specific, skip */
1537 [ + + ]: 28 : if (!SlotIsLogical(s))
1538 : 11 : continue;
1539 : :
1540 : : /*
1541 : : * Check logical slots on other databases too so we can disable
1542 : : * logical decoding only if no slots in the cluster.
1543 : : */
158 msawada@postgresql.o 1544 :GNC 17 : SpinLockAcquire(&s->mutex);
1545 : 17 : found_valid_logicalslot |= (s->data.invalidated == RS_INVAL_NONE);
1546 : 17 : SpinLockRelease(&s->mutex);
1547 : :
1548 : : /* not our database, skip */
3350 simon@2ndQuadrant.co 1549 [ + + ]:CBC 17 : if (s->data.database != dboid)
1550 : 12 : continue;
1551 : :
1552 : : /* NB: intentionally including invalidated slots to drop */
1553 : :
1554 : : /* acquire slot, so ReplicationSlotDropAcquired can be reused */
1555 [ - + ]: 5 : SpinLockAcquire(&s->mutex);
1556 : : /* can't change while ReplicationSlotControlLock is held */
3342 andres@anarazel.de 1557 : 5 : slotname = NameStr(s->data.name);
109 heikki.linnakangas@i 1558 :GNC 5 : active_proc = s->active_proc;
1559 [ + - ]: 5 : if (active_proc == INVALID_PROC_NUMBER)
1560 : : {
3350 simon@2ndQuadrant.co 1561 :CBC 5 : MyReplicationSlot = s;
109 heikki.linnakangas@i 1562 :GNC 5 : s->active_proc = MyProcNumber;
1563 : : }
3350 simon@2ndQuadrant.co 1564 :CBC 5 : SpinLockRelease(&s->mutex);
1565 : :
1566 : : /*
1567 : : * Even though we hold an exclusive lock on the database object a
1568 : : * logical slot for that DB can still be active, e.g. if it's
1569 : : * concurrently being dropped by a backend connected to another DB.
1570 : : *
1571 : : * That's fairly unlikely in practice, so we'll just bail out.
1572 : : *
1573 : : * The slot sync worker holds a shared lock on the database before
1574 : : * operating on synced logical slots to avoid conflict with the drop
1575 : : * happening here. The persistent synced slots are thus safe but there
1576 : : * is a possibility that the slot sync worker has created a temporary
1577 : : * slot (which stays active even on release) and we are trying to drop
1578 : : * that here. In practice, the chances of hitting this scenario are
1579 : : * less as during slot synchronization, the temporary slot is
1580 : : * immediately converted to persistent and thus is safe due to the
1581 : : * shared lock taken on the database. So, we'll just bail out in such
1582 : : * a case.
1583 : : *
1584 : : * XXX: We can consider shutting down the slot sync worker before
1585 : : * trying to drop synced temporary slots here.
1586 : : */
109 heikki.linnakangas@i 1587 [ - + ]:GNC 5 : if (active_proc != INVALID_PROC_NUMBER)
3342 andres@anarazel.de 1588 [ # # ]:UBC 0 : ereport(ERROR,
1589 : : (errcode(ERRCODE_OBJECT_IN_USE),
1590 : : errmsg("replication slot \"%s\" is active for PID %d",
1591 : : slotname, GetPGProcByNumber(active_proc)->pid)));
1592 : :
1593 : : /*
1594 : : * To avoid duplicating ReplicationSlotDropAcquired() and to avoid
1595 : : * holding ReplicationSlotControlLock over filesystem operations,
1596 : : * release ReplicationSlotControlLock and use
1597 : : * ReplicationSlotDropAcquired.
1598 : : *
1599 : : * As that means the set of slots could change, restart scan from the
1600 : : * beginning each time we release the lock.
1601 : : */
3350 simon@2ndQuadrant.co 1602 :CBC 5 : LWLockRelease(ReplicationSlotControlLock);
3 alvherre@kurilemu.de 1603 :GNC 5 : ReplicationSlotDropAcquired(false);
158 msawada@postgresql.o 1604 : 5 : dropped = true;
3350 simon@2ndQuadrant.co 1605 :CBC 5 : goto restart;
1606 : : }
1607 : 64 : LWLockRelease(ReplicationSlotControlLock);
1608 : :
158 msawada@postgresql.o 1609 [ + + - + ]:GNC 64 : if (dropped && !found_valid_logicalslot)
158 msawada@postgresql.o 1610 :UNC 0 : RequestDisableLogicalDecoding();
1611 : : }
1612 : :
1613 : : /*
1614 : : * Returns true if there is at least one in-use valid logical replication slot.
1615 : : */
1616 : : bool
158 msawada@postgresql.o 1617 :GNC 502 : CheckLogicalSlotExists(void)
1618 : : {
1619 : 502 : bool found = false;
1620 : :
53 alvherre@kurilemu.de 1621 [ - + ]: 502 : if (max_replication_slots + max_repack_replication_slots <= 0)
158 msawada@postgresql.o 1622 :UNC 0 : return false;
1623 : :
158 msawada@postgresql.o 1624 :GNC 502 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
53 alvherre@kurilemu.de 1625 [ + + ]: 7908 : for (int i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
1626 : : {
1627 : : ReplicationSlot *s;
1628 : : bool invalidated;
1629 : :
158 msawada@postgresql.o 1630 : 7412 : s = &ReplicationSlotCtl->replication_slots[i];
1631 : :
1632 : : /* cannot change while ReplicationSlotCtlLock is held */
1633 [ + + ]: 7412 : if (!s->in_use)
1634 : 7383 : continue;
1635 : :
1636 [ + + ]: 29 : if (SlotIsPhysical(s))
1637 : 20 : continue;
1638 : :
1639 : 9 : SpinLockAcquire(&s->mutex);
1640 : 9 : invalidated = s->data.invalidated != RS_INVAL_NONE;
1641 : 9 : SpinLockRelease(&s->mutex);
1642 : :
1643 [ + + ]: 9 : if (invalidated)
1644 : 3 : continue;
1645 : :
1646 : 6 : found = true;
1647 : 6 : break;
1648 : : }
1649 : 502 : LWLockRelease(ReplicationSlotControlLock);
1650 : :
1651 : 502 : return found;
1652 : : }
1653 : :
1654 : : /*
1655 : : * Check whether the server's configuration supports using replication
1656 : : * slots.
1657 : : */
1658 : : void
53 alvherre@kurilemu.de 1659 : 1891 : CheckSlotRequirements(bool repack)
1660 : : {
1661 : : /*
1662 : : * NB: Adding a new requirement likely means that RestoreSlotFromDisk()
1663 : : * needs the same check.
1664 : : */
1665 : :
1666 [ + + - + ]: 1891 : if (!repack && max_replication_slots == 0)
4502 rhaas@postgresql.org 1667 [ # # ]:UBC 0 : ereport(ERROR,
1668 : : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1669 : : errmsg("replication slots can only be used if \"%s\" > 0",
1670 : : "max_replication_slots"));
1671 : :
53 alvherre@kurilemu.de 1672 [ + + - + ]:GNC 1891 : if (repack && max_repack_replication_slots == 0)
53 alvherre@kurilemu.de 1673 [ # # ]:UNC 0 : ereport(ERROR,
1674 : : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1675 : : errmsg("REPACK can only be used if \"%s\" > 0",
1676 : : "max_repack_replication_slots"));
1677 : :
3743 peter_e@gmx.net 1678 [ - + ]:CBC 1891 : if (wal_level < WAL_LEVEL_REPLICA)
4502 rhaas@postgresql.org 1679 [ # # ]:UBC 0 : ereport(ERROR,
1680 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1681 : : errmsg("replication slots can only be used if \"wal_level\" >= \"replica\"")));
4502 rhaas@postgresql.org 1682 :CBC 1891 : }
1683 : :
1684 : : /*
1685 : : * Check whether the user has privilege to use replication slots.
1686 : : */
1687 : : void
1719 michael@paquier.xyz 1688 : 599 : CheckSlotPermissions(void)
1689 : : {
1171 peter@eisentraut.org 1690 [ + + ]: 599 : if (!has_rolreplication(GetUserId()))
1719 michael@paquier.xyz 1691 [ + - ]: 5 : ereport(ERROR,
1692 : : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1693 : : errmsg("permission denied to use replication slots"),
1694 : : errdetail("Only roles with the %s attribute may use replication slots.",
1695 : : "REPLICATION")));
1696 : 594 : }
1697 : :
1698 : : /*
1699 : : * Reserve WAL for the currently active slot.
1700 : : *
1701 : : * Compute and set restart_lsn in a manner that's appropriate for the type of
1702 : : * the slot and concurrency safe.
1703 : : */
1704 : : void
3945 andres@anarazel.de 1705 : 663 : ReplicationSlotReserveWal(void)
1706 : : {
1707 : 663 : ReplicationSlot *slot = MyReplicationSlot;
1708 : : XLogSegNo segno;
1709 : : XLogRecPtr restart_lsn;
1710 : :
1711 [ - + ]: 663 : Assert(slot != NULL);
205 alvherre@kurilemu.de 1712 [ - + ]:GNC 663 : Assert(!XLogRecPtrIsValid(slot->data.restart_lsn));
1713 [ - + ]: 663 : Assert(!XLogRecPtrIsValid(slot->last_saved_restart_lsn));
1714 : :
1715 : : /*
1716 : : * The replication slot mechanism is used to prevent the removal of
1717 : : * required WAL.
1718 : : *
1719 : : * Acquire an exclusive lock to prevent the checkpoint process from
1720 : : * concurrently computing the minimum slot LSN (see
1721 : : * CheckPointReplicationSlots). This ensures that the WAL reserved for
1722 : : * replication cannot be removed during a checkpoint.
1723 : : *
1724 : : * The mechanism is reliable because if WAL reservation occurs first, the
1725 : : * checkpoint must wait for the restart_lsn update before determining the
1726 : : * minimum non-removable LSN. On the other hand, if the checkpoint happens
1727 : : * first, subsequent WAL reservations will select positions at or beyond
1728 : : * the redo pointer of that checkpoint.
1729 : : */
173 akapila@postgresql.o 1730 :CBC 663 : LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
1731 : :
1732 : : /*
1733 : : * For logical slots log a standby snapshot and start logical decoding at
1734 : : * exactly that position. That allows the slot to start up more quickly.
1735 : : * But on a standby we cannot do WAL writes, so just use the replay
1736 : : * pointer; effectively, an attempt to create a logical slot on standby
1737 : : * will cause it to wait for an xl_running_xact record to be logged
1738 : : * independently on the primary, so that a snapshot can be built using the
1739 : : * record.
1740 : : *
1741 : : * None of this is needed (or indeed helpful) for physical slots as
1742 : : * they'll start replay at the last logged checkpoint anyway. Instead,
1743 : : * return the location of the last redo LSN, where a base backup has to
1744 : : * start replay at.
1745 : : */
1746 [ + + ]: 663 : if (SlotIsPhysical(slot))
1747 : 168 : restart_lsn = GetRedoRecPtr();
1748 [ + + ]: 495 : else if (RecoveryInProgress())
1749 : 26 : restart_lsn = GetXLogReplayRecPtr(NULL);
1750 : : else
1751 : 469 : restart_lsn = GetXLogInsertRecPtr();
1752 : :
1753 [ - + ]: 663 : SpinLockAcquire(&slot->mutex);
1754 : 663 : slot->data.restart_lsn = restart_lsn;
1755 : 663 : SpinLockRelease(&slot->mutex);
1756 : :
1757 : : /* prevent WAL removal as fast as possible */
1758 : 663 : ReplicationSlotsComputeRequiredLSN();
1759 : :
1760 : : /* Checkpoint shouldn't remove the required WAL. */
1761 : 663 : XLByteToSeg(slot->data.restart_lsn, segno, wal_segment_size);
1762 [ - + ]: 663 : if (XLogGetLastRemovedSegno() >= segno)
173 akapila@postgresql.o 1763 [ # # ]:UBC 0 : elog(ERROR, "WAL required by replication slot %s has been removed concurrently",
1764 : : NameStr(slot->data.name));
1765 : :
173 akapila@postgresql.o 1766 :CBC 663 : LWLockRelease(ReplicationSlotAllocationLock);
1767 : :
1148 andres@anarazel.de 1768 [ + + + + ]: 663 : if (!RecoveryInProgress() && SlotIsLogical(slot))
1769 : : {
1770 : : XLogRecPtr flushptr;
1771 : :
1772 : : /* make sure we have enough information to start */
7 alvherre@kurilemu.de 1773 : 469 : flushptr = LogStandbySnapshot();
1774 : :
1775 : : /* and make sure it's fsynced to disk */
1148 andres@anarazel.de 1776 : 469 : XLogFlush(flushptr);
1777 : : }
3945 1778 : 663 : }
1779 : :
1780 : : /*
1781 : : * Report that replication slot needs to be invalidated
1782 : : */
1783 : : static void
1149 1784 : 23 : ReportSlotInvalidation(ReplicationSlotInvalidationCause cause,
1785 : : bool terminating,
1786 : : int pid,
1787 : : NameData slotname,
1788 : : XLogRecPtr restart_lsn,
1789 : : XLogRecPtr oldestLSN,
1790 : : TransactionId snapshotConflictHorizon,
1791 : : long slot_idle_seconds)
1792 : : {
1793 : : StringInfoData err_detail;
1794 : : StringInfoData err_hint;
1795 : :
1796 : 23 : initStringInfo(&err_detail);
465 akapila@postgresql.o 1797 : 23 : initStringInfo(&err_hint);
1798 : :
1149 andres@anarazel.de 1799 [ + + + - : 23 : switch (cause)
- - ]
1800 : : {
1801 : 7 : case RS_INVAL_WAL_REMOVED:
1802 : : {
427 peter@eisentraut.org 1803 : 7 : uint64 ex = oldestLSN - restart_lsn;
1804 : :
1010 1805 : 7 : appendStringInfo(&err_detail,
327 alvherre@kurilemu.de 1806 :GNC 7 : ngettext("The slot's restart_lsn %X/%08X exceeds the limit by %" PRIu64 " byte.",
1807 : : "The slot's restart_lsn %X/%08X exceeds the limit by %" PRIu64 " bytes.",
1808 : : ex),
1010 peter@eisentraut.org 1809 :CBC 7 : LSN_FORMAT_ARGS(restart_lsn),
1810 : : ex);
1811 : : /* translator: %s is a GUC variable name */
465 akapila@postgresql.o 1812 : 7 : appendStringInfo(&err_hint, _("You might need to increase \"%s\"."),
1813 : : "max_slot_wal_keep_size");
1010 peter@eisentraut.org 1814 : 7 : break;
1815 : : }
1149 andres@anarazel.de 1816 : 12 : case RS_INVAL_HORIZON:
1817 : 12 : appendStringInfo(&err_detail, _("The slot conflicted with xid horizon %u."),
1818 : : snapshotConflictHorizon);
1819 : 12 : break;
1820 : :
1821 : 4 : case RS_INVAL_WAL_LEVEL:
158 msawada@postgresql.o 1822 :GNC 4 : appendStringInfoString(&err_detail, _("Logical decoding on standby requires the primary server to either set \"wal_level\" >= \"logical\" or have at least one logical slot when \"wal_level\" = \"replica\"."));
1149 andres@anarazel.de 1823 :CBC 4 : break;
1824 : :
465 akapila@postgresql.o 1825 :UBC 0 : case RS_INVAL_IDLE_TIMEOUT:
1826 : : {
1827 : : /* translator: %s is a GUC variable name */
323 fujii@postgresql.org 1828 : 0 : appendStringInfo(&err_detail, _("The slot's idle time of %lds exceeds the configured \"%s\" duration of %ds."),
1829 : : slot_idle_seconds, "idle_replication_slot_timeout",
1830 : : idle_replication_slot_timeout_secs);
1831 : : /* translator: %s is a GUC variable name */
465 akapila@postgresql.o 1832 : 0 : appendStringInfo(&err_hint, _("You might need to increase \"%s\"."),
1833 : : "idle_replication_slot_timeout");
1834 : 0 : break;
1835 : : }
1149 andres@anarazel.de 1836 : 0 : case RS_INVAL_NONE:
1837 : 0 : pg_unreachable();
1838 : : }
1839 : :
1149 andres@anarazel.de 1840 [ + - + + :CBC 23 : ereport(LOG,
+ + ]
1841 : : terminating ?
1842 : : errmsg("terminating process %d to release replication slot \"%s\"",
1843 : : pid, NameStr(slotname)) :
1844 : : errmsg("invalidating obsolete replication slot \"%s\"",
1845 : : NameStr(slotname)),
1846 : : errdetail_internal("%s", err_detail.data),
1847 : : err_hint.len ? errhint("%s", err_hint.data) : 0);
1848 : :
1849 : 23 : pfree(err_detail.data);
465 akapila@postgresql.o 1850 : 23 : pfree(err_hint.data);
1851 : 23 : }
1852 : :
1853 : : /*
1854 : : * Can we invalidate an idle replication slot?
1855 : : *
1856 : : * Idle timeout invalidation is allowed only when:
1857 : : *
1858 : : * 1. Idle timeout is set
1859 : : * 2. Slot has reserved WAL
1860 : : * 3. Slot is inactive
1861 : : * 4. The slot is not being synced from the primary while the server is in
1862 : : * recovery. This is because synced slots are always considered to be
1863 : : * inactive because they don't perform logical decoding to produce changes.
1864 : : */
1865 : : static inline bool
1866 : 390 : CanInvalidateIdleSlot(ReplicationSlot *s)
1867 : : {
323 fujii@postgresql.org 1868 : 390 : return (idle_replication_slot_timeout_secs != 0 &&
205 alvherre@kurilemu.de 1869 [ # # ]:UNC 0 : XLogRecPtrIsValid(s->data.restart_lsn) &&
465 akapila@postgresql.o 1870 [ - + - - ]:CBC 390 : s->inactive_since > 0 &&
465 akapila@postgresql.o 1871 [ # # # # ]:UBC 0 : !(RecoveryInProgress() && s->data.synced));
1872 : : }
1873 : :
1874 : : /*
1875 : : * DetermineSlotInvalidationCause - Determine the cause for which a slot
1876 : : * becomes invalid among the given possible causes.
1877 : : *
1878 : : * This function sequentially checks all possible invalidation causes and
1879 : : * returns the first one for which the slot is eligible for invalidation.
1880 : : */
1881 : : static ReplicationSlotInvalidationCause
465 akapila@postgresql.o 1882 :CBC 423 : DetermineSlotInvalidationCause(uint32 possible_causes, ReplicationSlot *s,
1883 : : XLogRecPtr oldestLSN, Oid dboid,
1884 : : TransactionId snapshotConflictHorizon,
1885 : : TimestampTz *inactive_since, TimestampTz now)
1886 : : {
1887 [ - + ]: 423 : Assert(possible_causes != RS_INVAL_NONE);
1888 : :
1889 [ + + ]: 423 : if (possible_causes & RS_INVAL_WAL_REMOVED)
1890 : : {
212 michael@paquier.xyz 1891 : 397 : XLogRecPtr restart_lsn = s->data.restart_lsn;
1892 : :
205 alvherre@kurilemu.de 1893 [ + + + + ]:GNC 397 : if (XLogRecPtrIsValid(restart_lsn) &&
1894 : : restart_lsn < oldestLSN)
465 akapila@postgresql.o 1895 :CBC 7 : return RS_INVAL_WAL_REMOVED;
1896 : : }
1897 : :
1898 [ + + ]: 416 : if (possible_causes & RS_INVAL_HORIZON)
1899 : : {
1900 : : /* invalid DB oid signals a shared relation */
1901 [ + - + + ]: 22 : if (SlotIsLogical(s) &&
1902 [ + - ]: 17 : (dboid == InvalidOid || dboid == s->data.database))
1903 : : {
212 michael@paquier.xyz 1904 : 22 : TransactionId effective_xmin = s->effective_xmin;
1905 : 22 : TransactionId catalog_effective_xmin = s->effective_catalog_xmin;
1906 : :
1907 [ - + - - ]: 22 : if (TransactionIdIsValid(effective_xmin) &&
212 michael@paquier.xyz 1908 :UBC 0 : TransactionIdPrecedesOrEquals(effective_xmin,
1909 : : snapshotConflictHorizon))
465 akapila@postgresql.o 1910 : 0 : return RS_INVAL_HORIZON;
212 michael@paquier.xyz 1911 [ + - + + ]:CBC 44 : else if (TransactionIdIsValid(catalog_effective_xmin) &&
1912 : 22 : TransactionIdPrecedesOrEquals(catalog_effective_xmin,
1913 : : snapshotConflictHorizon))
465 akapila@postgresql.o 1914 : 12 : return RS_INVAL_HORIZON;
1915 : : }
1916 : : }
1917 : :
1918 [ + + ]: 404 : if (possible_causes & RS_INVAL_WAL_LEVEL)
1919 : : {
1920 [ + - ]: 4 : if (SlotIsLogical(s))
1921 : 4 : return RS_INVAL_WAL_LEVEL;
1922 : : }
1923 : :
1924 [ + + ]: 400 : if (possible_causes & RS_INVAL_IDLE_TIMEOUT)
1925 : : {
1926 [ - + ]: 390 : Assert(now > 0);
1927 : :
1928 [ - + ]: 390 : if (CanInvalidateIdleSlot(s))
1929 : : {
1930 : : /*
1931 : : * Simulate the invalidation due to idle_timeout to test the
1932 : : * timeout behavior promptly, without waiting for it to trigger
1933 : : * naturally.
1934 : : */
1935 : : #ifdef USE_INJECTION_POINTS
465 akapila@postgresql.o 1936 [ # # ]:UBC 0 : if (IS_INJECTION_POINT_ATTACHED("slot-timeout-inval"))
1937 : : {
1938 : 0 : *inactive_since = 0; /* since the beginning of time */
1939 : 0 : return RS_INVAL_IDLE_TIMEOUT;
1940 : : }
1941 : : #endif
1942 : :
1943 : : /*
1944 : : * Check if the slot needs to be invalidated due to
1945 : : * idle_replication_slot_timeout GUC.
1946 : : */
1947 [ # # ]: 0 : if (TimestampDifferenceExceedsSeconds(s->inactive_since, now,
1948 : : idle_replication_slot_timeout_secs))
1949 : : {
1950 : 0 : *inactive_since = s->inactive_since;
1951 : 0 : return RS_INVAL_IDLE_TIMEOUT;
1952 : : }
1953 : : }
1954 : : }
1955 : :
465 akapila@postgresql.o 1956 :CBC 400 : return RS_INVAL_NONE;
1957 : : }
1958 : :
1959 : : /*
1960 : : * Helper for InvalidateObsoleteReplicationSlots
1961 : : *
1962 : : * Acquires the given slot and mark it invalid, if necessary and possible.
1963 : : *
1964 : : * Returns true if the slot was invalidated.
1965 : : *
1966 : : * Set *released_lock_out if ReplicationSlotControlLock was released in the
1967 : : * interim (and in that case we're not holding the lock at return, otherwise
1968 : : * we are).
1969 : : *
1970 : : * This is inherently racy, because we release the LWLock
1971 : : * for syscalls, so caller must restart if we return true.
1972 : : */
1973 : : static bool
1974 : 462 : InvalidatePossiblyObsoleteSlot(uint32 possible_causes,
1975 : : ReplicationSlot *s,
1976 : : XLogRecPtr oldestLSN,
1977 : : Oid dboid, TransactionId snapshotConflictHorizon,
1978 : : bool *released_lock_out)
1979 : : {
1814 alvherre@alvh.no-ip. 1980 : 462 : int last_signaled_pid = 0;
1981 : 462 : bool released_lock = false;
158 msawada@postgresql.o 1982 :GNC 462 : bool invalidated = false;
465 akapila@postgresql.o 1983 :CBC 462 : TimestampTz inactive_since = 0;
1984 : :
1985 : : for (;;)
2244 alvherre@alvh.no-ip. 1986 : 7 : {
1987 : : XLogRecPtr restart_lsn;
1988 : : NameData slotname;
1989 : : ProcNumber active_proc;
1814 1990 : 469 : int active_pid = 0;
799 akapila@postgresql.o 1991 : 469 : ReplicationSlotInvalidationCause invalidation_cause = RS_INVAL_NONE;
465 1992 : 469 : TimestampTz now = 0;
1993 : 469 : long slot_idle_secs = 0;
1994 : :
1814 alvherre@alvh.no-ip. 1995 [ - + ]: 469 : Assert(LWLockHeldByMeInMode(ReplicationSlotControlLock, LW_SHARED));
1996 : :
2244 1997 [ - + ]: 469 : if (!s->in_use)
1998 : : {
1814 alvherre@alvh.no-ip. 1999 [ # # ]:UBC 0 : if (released_lock)
2000 : 0 : LWLockRelease(ReplicationSlotControlLock);
2001 : 0 : break;
2002 : : }
2003 : :
465 akapila@postgresql.o 2004 [ + + ]:CBC 469 : if (possible_causes & RS_INVAL_IDLE_TIMEOUT)
2005 : : {
2006 : : /*
2007 : : * Assign the current time here to avoid system call overhead
2008 : : * while holding the spinlock in subsequent code.
2009 : : */
2010 : 411 : now = GetCurrentTimestamp();
2011 : : }
2012 : :
2013 : : /*
2014 : : * Check if the slot needs to be invalidated. If it needs to be
2015 : : * invalidated, and is not currently acquired, acquire it and mark it
2016 : : * as having been invalidated. We do this with the spinlock held to
2017 : : * avoid race conditions -- for example the restart_lsn could move
2018 : : * forward, or the slot could be dropped.
2019 : : */
2244 alvherre@alvh.no-ip. 2020 [ - + ]: 469 : SpinLockAcquire(&s->mutex);
2021 : :
2022 : 469 : restart_lsn = s->data.restart_lsn;
2023 : :
2024 : : /* we do nothing if the slot is already invalid */
1149 andres@anarazel.de 2025 [ + + ]: 469 : if (s->data.invalidated == RS_INVAL_NONE)
465 akapila@postgresql.o 2026 : 423 : invalidation_cause = DetermineSlotInvalidationCause(possible_causes,
2027 : : s, oldestLSN,
2028 : : dboid,
2029 : : snapshotConflictHorizon,
2030 : : &inactive_since,
2031 : : now);
2032 : :
2033 : : /* if there's no invalidation, we're done */
799 2034 [ + + ]: 469 : if (invalidation_cause == RS_INVAL_NONE)
2035 : : {
1814 alvherre@alvh.no-ip. 2036 : 446 : SpinLockRelease(&s->mutex);
2037 [ - + ]: 446 : if (released_lock)
1814 alvherre@alvh.no-ip. 2038 :UBC 0 : LWLockRelease(ReplicationSlotControlLock);
1814 alvherre@alvh.no-ip. 2039 :CBC 446 : break;
2040 : : }
2041 : :
2042 : 23 : slotname = s->data.name;
109 heikki.linnakangas@i 2043 :GNC 23 : active_proc = s->active_proc;
2044 : :
2045 : : /*
2046 : : * If the slot can be acquired, do so and mark it invalidated
2047 : : * immediately. Otherwise we'll signal the owning process, below, and
2048 : : * retry.
2049 : : *
2050 : : * Note: Unlike other slot attributes, slot's inactive_since can't be
2051 : : * changed until the acquired slot is released or the owning process
2052 : : * is terminated. So, the inactive slot can only be invalidated
2053 : : * immediately without being terminated.
2054 : : */
2055 [ + + ]: 23 : if (active_proc == INVALID_PROC_NUMBER)
2056 : : {
1814 alvherre@alvh.no-ip. 2057 :CBC 16 : MyReplicationSlot = s;
109 heikki.linnakangas@i 2058 :GNC 16 : s->active_proc = MyProcNumber;
799 akapila@postgresql.o 2059 :CBC 16 : s->data.invalidated = invalidation_cause;
2060 : :
2061 : : /*
2062 : : * XXX: We should consider not overwriting restart_lsn and instead
2063 : : * just rely on .invalidated.
2064 : : */
2065 [ + + ]: 16 : if (invalidation_cause == RS_INVAL_WAL_REMOVED)
2066 : : {
1149 andres@anarazel.de 2067 : 5 : s->data.restart_lsn = InvalidXLogRecPtr;
350 akorotkov@postgresql 2068 : 5 : s->last_saved_restart_lsn = InvalidXLogRecPtr;
2069 : : }
2070 : :
2071 : : /* Let caller know */
158 msawada@postgresql.o 2072 :GNC 16 : invalidated = true;
2073 : : }
2074 : : else
2075 : : {
109 heikki.linnakangas@i 2076 : 7 : active_pid = GetPGProcByNumber(active_proc)->pid;
2077 [ - + ]: 7 : Assert(active_pid != 0);
2078 : : }
2079 : :
1814 alvherre@alvh.no-ip. 2080 :CBC 23 : SpinLockRelease(&s->mutex);
2081 : :
2082 : : /*
2083 : : * Calculate the idle time duration of the slot if slot is marked
2084 : : * invalidated with RS_INVAL_IDLE_TIMEOUT.
2085 : : */
465 akapila@postgresql.o 2086 [ - + ]: 23 : if (invalidation_cause == RS_INVAL_IDLE_TIMEOUT)
2087 : : {
2088 : : int slot_idle_usecs;
2089 : :
465 akapila@postgresql.o 2090 :UBC 0 : TimestampDifference(inactive_since, now, &slot_idle_secs,
2091 : : &slot_idle_usecs);
2092 : : }
2093 : :
109 heikki.linnakangas@i 2094 [ + + ]:GNC 23 : if (active_proc != INVALID_PROC_NUMBER)
2095 : : {
2096 : : /*
2097 : : * Prepare the sleep on the slot's condition variable before
2098 : : * releasing the lock, to close a possible race condition if the
2099 : : * slot is released before the sleep below.
2100 : : */
1814 alvherre@alvh.no-ip. 2101 :CBC 7 : ConditionVariablePrepareToSleep(&s->active_cv);
2102 : :
2103 : 7 : LWLockRelease(ReplicationSlotControlLock);
2104 : 7 : released_lock = true;
2105 : :
2106 : : /*
2107 : : * Signal to terminate the process that owns the slot, if we
2108 : : * haven't already signalled it. (Avoidance of repeated
2109 : : * signalling is the only reason for there to be a loop in this
2110 : : * routine; otherwise we could rely on caller's restart loop.)
2111 : : *
2112 : : * There is the race condition that other process may own the slot
2113 : : * after its current owner process is terminated and before this
2114 : : * process owns it. To handle that, we signal only if the PID of
2115 : : * the owning process has changed from the previous time. (This
2116 : : * logic assumes that the same PID is not reused very quickly.)
2117 : : */
2118 [ + - ]: 7 : if (last_signaled_pid != active_pid)
2119 : : {
799 akapila@postgresql.o 2120 : 7 : ReportSlotInvalidation(invalidation_cause, true, active_pid,
2121 : : slotname, restart_lsn,
2122 : : oldestLSN, snapshotConflictHorizon,
2123 : : slot_idle_secs);
2124 : :
1149 andres@anarazel.de 2125 [ + + ]: 7 : if (MyBackendType == B_STARTUP)
109 heikki.linnakangas@i 2126 :GNC 5 : (void) SignalRecoveryConflict(GetPGProcByNumber(active_proc),
2127 : : active_pid,
2128 : : RECOVERY_CONFLICT_LOGICALSLOT);
2129 : : else
1149 andres@anarazel.de 2130 :CBC 2 : (void) kill(active_pid, SIGTERM);
2131 : :
1814 alvherre@alvh.no-ip. 2132 : 7 : last_signaled_pid = active_pid;
2133 : : }
2134 : :
2135 : : /* Wait until the slot is released. */
2136 : 7 : ConditionVariableSleep(&s->active_cv,
2137 : : WAIT_EVENT_REPLICATION_SLOT_DROP);
2138 : :
2139 : : /*
2140 : : * Re-acquire lock and start over; we expect to invalidate the
2141 : : * slot next time (unless another process acquires the slot in the
2142 : : * meantime).
2143 : : *
2144 : : * Note: It is possible for a slot to advance its restart_lsn or
2145 : : * xmin values sufficiently between when we release the mutex and
2146 : : * when we recheck, moving from a conflicting state to a non
2147 : : * conflicting state. This is intentional and safe: if the slot
2148 : : * has caught up while we're busy here, the resources we were
2149 : : * concerned about (WAL segments or tuples) have not yet been
2150 : : * removed, and there's no reason to invalidate the slot.
2151 : : */
2152 : 7 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
2153 : 7 : continue;
2154 : : }
2155 : : else
2156 : : {
2157 : : /*
2158 : : * We hold the slot now and have already invalidated it; flush it
2159 : : * to ensure that state persists.
2160 : : *
2161 : : * Don't want to hold ReplicationSlotControlLock across file
2162 : : * system operations, so release it now but be sure to tell caller
2163 : : * to restart from scratch.
2164 : : */
2165 : 16 : LWLockRelease(ReplicationSlotControlLock);
2166 : 16 : released_lock = true;
2167 : :
2168 : : /* Make sure the invalidated state persists across server restart */
2169 : 16 : ReplicationSlotMarkDirty();
2170 : 16 : ReplicationSlotSave();
2171 : 16 : ReplicationSlotRelease();
2172 : :
799 akapila@postgresql.o 2173 : 16 : ReportSlotInvalidation(invalidation_cause, false, active_pid,
2174 : : slotname, restart_lsn,
2175 : : oldestLSN, snapshotConflictHorizon,
2176 : : slot_idle_secs);
2177 : :
2178 : : /* done with this slot for now */
1814 alvherre@alvh.no-ip. 2179 : 16 : break;
2180 : : }
2181 : : }
2182 : :
2183 [ - + ]: 462 : Assert(released_lock == !LWLockHeldByMe(ReplicationSlotControlLock));
2184 : :
158 msawada@postgresql.o 2185 :GNC 462 : *released_lock_out = released_lock;
2186 : 462 : return invalidated;
2187 : : }
2188 : :
2189 : : /*
2190 : : * Invalidate slots that require resources about to be removed.
2191 : : *
2192 : : * Returns true when any slot have got invalidated.
2193 : : *
2194 : : * Whether a slot needs to be invalidated depends on the invalidation cause.
2195 : : * A slot is invalidated if it:
2196 : : * - RS_INVAL_WAL_REMOVED: requires a LSN older than the given segment
2197 : : * - RS_INVAL_HORIZON: requires a snapshot <= the given horizon in the given
2198 : : * db; dboid may be InvalidOid for shared relations
2199 : : * - RS_INVAL_WAL_LEVEL: is a logical slot and effective_wal_level is not
2200 : : * logical.
2201 : : * - RS_INVAL_IDLE_TIMEOUT: has been idle longer than the configured
2202 : : * "idle_replication_slot_timeout" duration.
2203 : : *
2204 : : * Note: This function attempts to invalidate the slot for multiple possible
2205 : : * causes in a single pass, minimizing redundant iterations. The "cause"
2206 : : * parameter can be a MASK representing one or more of the defined causes.
2207 : : *
2208 : : * If it invalidates the last logical slot in the cluster, it requests to
2209 : : * disable logical decoding.
2210 : : *
2211 : : * NB - this runs as part of checkpoint, so avoid raising errors if possible.
2212 : : */
2213 : : bool
465 akapila@postgresql.o 2214 :CBC 1975 : InvalidateObsoleteReplicationSlots(uint32 possible_causes,
2215 : : XLogSegNo oldestSegno, Oid dboid,
2216 : : TransactionId snapshotConflictHorizon)
2217 : : {
2218 : : XLogRecPtr oldestLSN;
1779 alvherre@alvh.no-ip. 2219 : 1975 : bool invalidated = false;
158 msawada@postgresql.o 2220 :GNC 1975 : bool invalidated_logical = false;
2221 : : bool found_valid_logicalslot;
2222 : :
465 akapila@postgresql.o 2223 [ + + - + ]:CBC 1975 : Assert(!(possible_causes & RS_INVAL_HORIZON) || TransactionIdIsValid(snapshotConflictHorizon));
2224 [ + + - + ]: 1975 : Assert(!(possible_causes & RS_INVAL_WAL_REMOVED) || oldestSegno > 0);
2225 [ - + ]: 1975 : Assert(possible_causes != RS_INVAL_NONE);
2226 : :
53 alvherre@kurilemu.de 2227 [ + + - + ]:GNC 1975 : if (max_replication_slots == 0 && max_repack_replication_slots == 0)
1149 andres@anarazel.de 2228 :UBC 0 : return invalidated;
2229 : :
1814 alvherre@alvh.no-ip. 2230 :CBC 1975 : XLogSegNoOffsetToRecPtr(oldestSegno, 0, wal_segment_size, oldestLSN);
2231 : :
2232 : 1991 : restart:
158 msawada@postgresql.o 2233 :GNC 1991 : found_valid_logicalslot = false;
1814 alvherre@alvh.no-ip. 2234 :CBC 1991 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
53 alvherre@kurilemu.de 2235 [ + + ]:GNC 31255 : for (int i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
2236 : : {
1814 alvherre@alvh.no-ip. 2237 :CBC 29280 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
158 msawada@postgresql.o 2238 :GNC 29280 : bool released_lock = false;
2239 : :
1814 alvherre@alvh.no-ip. 2240 [ + + ]:CBC 29280 : if (!s->in_use)
2241 : 28818 : continue;
2242 : :
2243 : : /* Prevent invalidation of logical slots during binary upgrade */
323 akapila@postgresql.o 2244 [ + + + + ]: 471 : if (SlotIsLogical(s) && IsBinaryUpgrade)
2245 : : {
158 msawada@postgresql.o 2246 :GNC 9 : SpinLockAcquire(&s->mutex);
2247 : 9 : found_valid_logicalslot |= (s->data.invalidated == RS_INVAL_NONE);
2248 : 9 : SpinLockRelease(&s->mutex);
2249 : :
323 akapila@postgresql.o 2250 :CBC 9 : continue;
2251 : : }
2252 : :
158 msawada@postgresql.o 2253 [ + + ]:GNC 462 : if (InvalidatePossiblyObsoleteSlot(possible_causes, s, oldestLSN,
2254 : : dboid, snapshotConflictHorizon,
2255 : : &released_lock))
2256 : : {
2257 [ - + ]: 16 : Assert(released_lock);
2258 : :
2259 : : /* Remember we have invalidated a physical or logical slot */
2260 : 16 : invalidated = true;
2261 : :
2262 : : /*
2263 : : * Additionally, remember we have invalidated a logical slot as we
2264 : : * can request disabling logical decoding later.
2265 : : */
2266 [ + + ]: 16 : if (SlotIsLogical(s))
2267 : 13 : invalidated_logical = true;
2268 : : }
2269 : : else
2270 : : {
2271 : : /*
2272 : : * We need to check if the slot is invalidated here since
2273 : : * InvalidatePossiblyObsoleteSlot() returns false also if the slot
2274 : : * is already invalidated.
2275 : : */
2276 : 446 : SpinLockAcquire(&s->mutex);
2277 : 892 : found_valid_logicalslot |=
2278 [ + + + + ]: 446 : (SlotIsLogical(s) && (s->data.invalidated == RS_INVAL_NONE));
2279 : 446 : SpinLockRelease(&s->mutex);
2280 : : }
2281 : :
2282 : : /* if the lock was released, start from scratch */
2283 [ + + ]: 462 : if (released_lock)
2284 : 16 : goto restart;
2285 : : }
2244 alvherre@alvh.no-ip. 2286 :CBC 1975 : LWLockRelease(ReplicationSlotControlLock);
2287 : :
2288 : : /*
2289 : : * If any slots have been invalidated, recalculate the resource limits.
2290 : : */
1779 2291 [ + + ]: 1975 : if (invalidated)
2292 : : {
2293 : 11 : ReplicationSlotsComputeRequiredXmin(false);
2294 : 11 : ReplicationSlotsComputeRequiredLSN();
2295 : : }
2296 : :
2297 : : /*
2298 : : * Request the checkpointer to disable logical decoding if no valid
2299 : : * logical slots remain. If called by the checkpointer during a
2300 : : * checkpoint, only the request is initiated; actual deactivation is
2301 : : * deferred until after the checkpoint completes.
2302 : : */
158 msawada@postgresql.o 2303 [ + + + - ]:GNC 1975 : if (invalidated_logical && !found_valid_logicalslot)
2304 : 8 : RequestDisableLogicalDecoding();
2305 : :
1779 alvherre@alvh.no-ip. 2306 :CBC 1975 : return invalidated;
2307 : : }
2308 : :
2309 : : /*
2310 : : * Flush all replication slots to disk.
2311 : : *
2312 : : * It is convenient to flush dirty replication slots at the time of checkpoint.
2313 : : * Additionally, in case of a shutdown checkpoint, we also identify the slots
2314 : : * for which the confirmed_flush LSN has been updated since the last time it
2315 : : * was saved and flush them.
2316 : : */
2317 : : void
989 akapila@postgresql.o 2318 : 1950 : CheckPointReplicationSlots(bool is_shutdown)
2319 : : {
2320 : : int i;
337 akorotkov@postgresql 2321 : 1950 : bool last_saved_restart_lsn_updated = false;
2322 : :
4218 peter_e@gmx.net 2323 [ + + ]: 1950 : elog(DEBUG1, "performing replication slot checkpoint");
2324 : :
2325 : : /*
2326 : : * Prevent any slot from being created/dropped while we're active. As we
2327 : : * explicitly do *not* want to block iterating over replication_slots or
2328 : : * acquiring a slot we cannot take the control lock - but that's OK,
2329 : : * because holding ReplicationSlotAllocationLock is strictly stronger, and
2330 : : * enough to guarantee that nobody can change the in_use bits on us.
2331 : : *
2332 : : * Additionally, acquiring the Allocation lock is necessary to serialize
2333 : : * the slot flush process with concurrent slot WAL reservation. This
2334 : : * ensures that the WAL position being reserved is either flushed to disk
2335 : : * or is beyond or equal to the redo pointer of the current checkpoint
2336 : : * (See ReplicationSlotReserveWal for details).
2337 : : */
4502 rhaas@postgresql.org 2338 : 1950 : LWLockAcquire(ReplicationSlotAllocationLock, LW_SHARED);
2339 : :
53 alvherre@kurilemu.de 2340 [ + + ]:GNC 30919 : for (i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
2341 : : {
4502 rhaas@postgresql.org 2342 :CBC 28969 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
2343 : : char path[MAXPGPATH];
2344 : :
2345 [ + + ]: 28969 : if (!s->in_use)
2346 : 28556 : continue;
2347 : :
2348 : : /* save the slot to disk, locking is handled in SaveSlotToPath() */
638 michael@paquier.xyz 2349 : 413 : sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(s->data.name));
2350 : :
2351 : : /*
2352 : : * Slot's data is not flushed each time the confirmed_flush LSN is
2353 : : * updated as that could lead to frequent writes. However, we decide
2354 : : * to force a flush of all logical slot's data at the time of shutdown
2355 : : * if the confirmed_flush LSN is changed since we last flushed it to
2356 : : * disk. This helps in avoiding an unnecessary retreat of the
2357 : : * confirmed_flush LSN after restart.
2358 : : */
989 akapila@postgresql.o 2359 [ + + + + ]: 413 : if (is_shutdown && SlotIsLogical(s))
2360 : : {
2361 [ - + ]: 94 : SpinLockAcquire(&s->mutex);
2362 : :
2363 [ + + ]: 94 : if (s->data.invalidated == RS_INVAL_NONE &&
718 2364 [ + + ]: 93 : s->data.confirmed_flush > s->last_saved_confirmed_flush)
2365 : : {
989 2366 : 41 : s->just_dirtied = true;
2367 : 41 : s->dirty = true;
2368 : : }
2369 : 94 : SpinLockRelease(&s->mutex);
2370 : : }
2371 : :
2372 : : /*
2373 : : * Track if we're going to update slot's last_saved_restart_lsn. We
2374 : : * need this to know if we need to recompute the required LSN.
2375 : : */
337 akorotkov@postgresql 2376 [ + + ]: 413 : if (s->last_saved_restart_lsn != s->data.restart_lsn)
2377 : 217 : last_saved_restart_lsn_updated = true;
2378 : :
4502 rhaas@postgresql.org 2379 : 413 : SaveSlotToPath(s, path, LOG);
2380 : : }
2381 : 1950 : LWLockRelease(ReplicationSlotAllocationLock);
2382 : :
2383 : : /*
2384 : : * Recompute the required LSN if SaveSlotToPath() updated
2385 : : * last_saved_restart_lsn for any slot.
2386 : : */
337 akorotkov@postgresql 2387 [ + + ]: 1950 : if (last_saved_restart_lsn_updated)
2388 : 217 : ReplicationSlotsComputeRequiredLSN();
4502 rhaas@postgresql.org 2389 : 1950 : }
2390 : :
2391 : : /*
2392 : : * Load all replication slots from disk into memory at server startup. This
2393 : : * needs to be run before we start crash recovery.
2394 : : */
2395 : : void
4370 andres@anarazel.de 2396 : 1085 : StartupReplicationSlots(void)
2397 : : {
2398 : : DIR *replication_dir;
2399 : : struct dirent *replication_de;
2400 : :
4218 peter_e@gmx.net 2401 [ + + ]: 1085 : elog(DEBUG1, "starting up replication slots");
2402 : :
2403 : : /* restore all slots by iterating over all on-disk entries */
638 michael@paquier.xyz 2404 : 1085 : replication_dir = AllocateDir(PG_REPLSLOT_DIR);
2405 [ + + ]: 3375 : while ((replication_de = ReadDir(replication_dir, PG_REPLSLOT_DIR)) != NULL)
2406 : : {
2407 : : char path[MAXPGPATH + sizeof(PG_REPLSLOT_DIR)];
2408 : : PGFileType de_type;
2409 : :
4502 rhaas@postgresql.org 2410 [ + + ]: 2292 : if (strcmp(replication_de->d_name, ".") == 0 ||
2411 [ + + ]: 1209 : strcmp(replication_de->d_name, "..") == 0)
2412 : 2166 : continue;
2413 : :
638 michael@paquier.xyz 2414 : 126 : snprintf(path, sizeof(path), "%s/%s", PG_REPLSLOT_DIR, replication_de->d_name);
1366 2415 : 126 : de_type = get_dirent_type(path, replication_de, false, DEBUG1);
2416 : :
2417 : : /* we're only creating directories here, skip if it's not our's */
2418 [ + - - + ]: 126 : if (de_type != PGFILETYPE_ERROR && de_type != PGFILETYPE_DIR)
4502 rhaas@postgresql.org 2419 :UBC 0 : continue;
2420 : :
2421 : : /* we crashed while a slot was being setup or deleted, clean up */
4165 andres@anarazel.de 2422 [ - + ]:CBC 126 : if (pg_str_endswith(replication_de->d_name, ".tmp"))
2423 : : {
4502 rhaas@postgresql.org 2424 [ # # ]:UBC 0 : if (!rmtree(path, true))
2425 : : {
2426 [ # # ]: 0 : ereport(WARNING,
2427 : : (errmsg("could not remove directory \"%s\"",
2428 : : path)));
2429 : 0 : continue;
2430 : : }
638 michael@paquier.xyz 2431 : 0 : fsync_fname(PG_REPLSLOT_DIR, true);
4502 rhaas@postgresql.org 2432 : 0 : continue;
2433 : : }
2434 : :
2435 : : /* looks like a slot in a normal state, restore */
4502 rhaas@postgresql.org 2436 :CBC 126 : RestoreSlotFromDisk(replication_de->d_name);
2437 : : }
2438 : 1083 : FreeDir(replication_dir);
2439 : :
2440 : : /* currently no slots exist, we're done. */
53 alvherre@kurilemu.de 2441 [ - + ]:GNC 1083 : if (max_replication_slots + max_repack_replication_slots <= 0)
4502 rhaas@postgresql.org 2442 :UBC 0 : return;
2443 : :
2444 : : /* Now that we have recovered all the data, compute replication xmin */
4471 rhaas@postgresql.org 2445 :CBC 1083 : ReplicationSlotsComputeRequiredXmin(false);
4502 2446 : 1083 : ReplicationSlotsComputeRequiredLSN();
2447 : : }
2448 : :
2449 : : /* ----
2450 : : * Manipulation of on-disk state of replication slots
2451 : : *
2452 : : * NB: none of the routines below should take any notice whether a slot is the
2453 : : * current one or not, that's all handled a layer above.
2454 : : * ----
2455 : : */
2456 : : static void
2457 : 716 : CreateSlotOnDisk(ReplicationSlot *slot)
2458 : : {
2459 : : char tmppath[MAXPGPATH];
2460 : : char path[MAXPGPATH];
2461 : : struct stat st;
2462 : :
2463 : : /*
2464 : : * No need to take out the io_in_progress_lock, nobody else can see this
2465 : : * slot yet, so nobody else will write. We're reusing SaveSlotToPath which
2466 : : * takes out the lock, if we'd take the lock here, we'd deadlock.
2467 : : */
2468 : :
638 michael@paquier.xyz 2469 : 716 : sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(slot->data.name));
2470 : 716 : sprintf(tmppath, "%s/%s.tmp", PG_REPLSLOT_DIR, NameStr(slot->data.name));
2471 : :
2472 : : /*
2473 : : * It's just barely possible that some previous effort to create or drop a
2474 : : * slot with this name left a temp directory lying around. If that seems
2475 : : * to be the case, try to remove it. If the rmtree() fails, we'll error
2476 : : * out at the MakePGDirectory() below, so we don't bother checking
2477 : : * success.
2478 : : */
4502 rhaas@postgresql.org 2479 [ - + - - ]: 716 : if (stat(tmppath, &st) == 0 && S_ISDIR(st.st_mode))
4502 rhaas@postgresql.org 2480 :UBC 0 : rmtree(tmppath, true);
2481 : :
2482 : : /* Create and fsync the temporary slot directory. */
2975 sfrost@snowman.net 2483 [ - + ]:CBC 716 : if (MakePGDirectory(tmppath) < 0)
4502 rhaas@postgresql.org 2484 [ # # ]:UBC 0 : ereport(ERROR,
2485 : : (errcode_for_file_access(),
2486 : : errmsg("could not create directory \"%s\": %m",
2487 : : tmppath)));
4502 rhaas@postgresql.org 2488 :CBC 716 : fsync_fname(tmppath, true);
2489 : :
2490 : : /* Write the actual state file. */
4407 bruce@momjian.us 2491 : 716 : slot->dirty = true; /* signal that we really need to write */
4502 rhaas@postgresql.org 2492 : 716 : SaveSlotToPath(slot, tmppath, ERROR);
2493 : :
2494 : : /* Rename the directory into place. */
2495 [ - + ]: 716 : if (rename(tmppath, path) != 0)
4502 rhaas@postgresql.org 2496 [ # # ]:UBC 0 : ereport(ERROR,
2497 : : (errcode_for_file_access(),
2498 : : errmsg("could not rename file \"%s\" to \"%s\": %m",
2499 : : tmppath, path)));
2500 : :
2501 : : /*
2502 : : * If we'd now fail - really unlikely - we wouldn't know whether this slot
2503 : : * would persist after an OS crash or not - so, force a restart. The
2504 : : * restart would try to fsync this again till it works.
2505 : : */
4502 rhaas@postgresql.org 2506 :CBC 716 : START_CRIT_SECTION();
2507 : :
2508 : 716 : fsync_fname(path, true);
638 michael@paquier.xyz 2509 : 716 : fsync_fname(PG_REPLSLOT_DIR, true);
2510 : :
4502 rhaas@postgresql.org 2511 [ - + ]: 716 : END_CRIT_SECTION();
2512 : 716 : }
2513 : :
2514 : : /*
2515 : : * Shared functionality between saving and creating a replication slot.
2516 : : */
2517 : : static void
2518 : 2627 : SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
2519 : : {
2520 : : char tmppath[MAXPGPATH];
2521 : : char path[MAXPGPATH];
2522 : : int fd;
2523 : : ReplicationSlotOnDisk cp;
2524 : : bool was_dirty;
2525 : :
2526 : : /* first check whether there's something to write out */
3889 2527 [ - + ]: 2627 : SpinLockAcquire(&slot->mutex);
2528 : 2627 : was_dirty = slot->dirty;
2529 : 2627 : slot->just_dirtied = false;
2530 : 2627 : SpinLockRelease(&slot->mutex);
2531 : :
2532 : : /* and don't do anything if there's nothing to write */
4502 2533 [ + + ]: 2627 : if (!was_dirty)
2534 : 144 : return;
2535 : :
3774 2536 : 2483 : LWLockAcquire(&slot->io_in_progress_lock, LW_EXCLUSIVE);
2537 : :
2538 : : /* silence valgrind :( */
4502 2539 : 2483 : memset(&cp, 0, sizeof(ReplicationSlotOnDisk));
2540 : :
2541 : 2483 : sprintf(tmppath, "%s/state.tmp", dir);
2542 : 2483 : sprintf(path, "%s/state", dir);
2543 : :
3171 peter_e@gmx.net 2544 : 2483 : fd = OpenTransientFile(tmppath, O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
4502 rhaas@postgresql.org 2545 [ - + ]: 2483 : if (fd < 0)
2546 : : {
2547 : : /*
2548 : : * If not an ERROR, then release the lock before returning. In case
2549 : : * of an ERROR, the error recovery path automatically releases the
2550 : : * lock, but no harm in explicitly releasing even in that case. Note
2551 : : * that LWLockRelease() could affect errno.
2552 : : */
2246 peter@eisentraut.org 2553 :UBC 0 : int save_errno = errno;
2554 : :
2256 2555 : 0 : LWLockRelease(&slot->io_in_progress_lock);
2246 2556 : 0 : errno = save_errno;
4502 rhaas@postgresql.org 2557 [ # # ]: 0 : ereport(elevel,
2558 : : (errcode_for_file_access(),
2559 : : errmsg("could not create file \"%s\": %m",
2560 : : tmppath)));
2561 : 0 : return;
2562 : : }
2563 : :
4502 rhaas@postgresql.org 2564 :CBC 2483 : cp.magic = SLOT_MAGIC;
4225 heikki.linnakangas@i 2565 : 2483 : INIT_CRC32C(cp.checksum);
4217 andres@anarazel.de 2566 : 2483 : cp.version = SLOT_VERSION;
2567 : 2483 : cp.length = ReplicationSlotOnDiskV2Size;
2568 : :
4502 rhaas@postgresql.org 2569 [ - + ]: 2483 : SpinLockAcquire(&slot->mutex);
2570 : :
2571 : 2483 : memcpy(&cp.slotdata, &slot->data, sizeof(ReplicationSlotPersistentData));
2572 : :
2573 : 2483 : SpinLockRelease(&slot->mutex);
2574 : :
4225 heikki.linnakangas@i 2575 : 2483 : COMP_CRC32C(cp.checksum,
2576 : : (char *) (&cp) + ReplicationSlotOnDiskNotChecksummedSize,
2577 : : ReplicationSlotOnDiskChecksummedSize);
4217 andres@anarazel.de 2578 : 2483 : FIN_CRC32C(cp.checksum);
2579 : :
2855 michael@paquier.xyz 2580 : 2483 : errno = 0;
3360 rhaas@postgresql.org 2581 : 2483 : pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_WRITE);
4502 2582 [ - + ]: 2483 : if ((write(fd, &cp, sizeof(cp))) != sizeof(cp))
2583 : : {
4407 bruce@momjian.us 2584 :UBC 0 : int save_errno = errno;
2585 : :
3360 rhaas@postgresql.org 2586 : 0 : pgstat_report_wait_end();
4502 2587 : 0 : CloseTransientFile(fd);
232 michael@paquier.xyz 2588 : 0 : unlink(tmppath);
2256 peter@eisentraut.org 2589 : 0 : LWLockRelease(&slot->io_in_progress_lock);
2590 : :
2591 : : /* if write didn't set errno, assume problem is no disk space */
2896 michael@paquier.xyz 2592 [ # # ]: 0 : errno = save_errno ? save_errno : ENOSPC;
4502 rhaas@postgresql.org 2593 [ # # ]: 0 : ereport(elevel,
2594 : : (errcode_for_file_access(),
2595 : : errmsg("could not write to file \"%s\": %m",
2596 : : tmppath)));
2597 : 0 : return;
2598 : : }
3360 rhaas@postgresql.org 2599 :CBC 2483 : pgstat_report_wait_end();
2600 : :
2601 : : /* fsync the temporary file */
2602 : 2483 : pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_SYNC);
4502 2603 [ - + ]: 2483 : if (pg_fsync(fd) != 0)
2604 : : {
4407 bruce@momjian.us 2605 :UBC 0 : int save_errno = errno;
2606 : :
3360 rhaas@postgresql.org 2607 : 0 : pgstat_report_wait_end();
4502 2608 : 0 : CloseTransientFile(fd);
232 michael@paquier.xyz 2609 : 0 : unlink(tmppath);
2256 peter@eisentraut.org 2610 : 0 : LWLockRelease(&slot->io_in_progress_lock);
2611 : :
4502 rhaas@postgresql.org 2612 : 0 : errno = save_errno;
2613 [ # # ]: 0 : ereport(elevel,
2614 : : (errcode_for_file_access(),
2615 : : errmsg("could not fsync file \"%s\": %m",
2616 : : tmppath)));
2617 : 0 : return;
2618 : : }
3360 rhaas@postgresql.org 2619 :CBC 2483 : pgstat_report_wait_end();
2620 : :
2520 peter@eisentraut.org 2621 [ - + ]: 2483 : if (CloseTransientFile(fd) != 0)
2622 : : {
2246 peter@eisentraut.org 2623 :UBC 0 : int save_errno = errno;
2624 : :
232 michael@paquier.xyz 2625 : 0 : unlink(tmppath);
2256 peter@eisentraut.org 2626 : 0 : LWLockRelease(&slot->io_in_progress_lock);
2627 : :
2246 2628 : 0 : errno = save_errno;
2639 michael@paquier.xyz 2629 [ # # ]: 0 : ereport(elevel,
2630 : : (errcode_for_file_access(),
2631 : : errmsg("could not close file \"%s\": %m",
2632 : : tmppath)));
2600 2633 : 0 : return;
2634 : : }
2635 : :
2636 : : /* rename to permanent file, fsync file and directory */
4502 rhaas@postgresql.org 2637 [ - + ]:CBC 2483 : if (rename(tmppath, path) != 0)
2638 : : {
2246 peter@eisentraut.org 2639 :UBC 0 : int save_errno = errno;
2640 : :
232 michael@paquier.xyz 2641 : 0 : unlink(tmppath);
2256 peter@eisentraut.org 2642 : 0 : LWLockRelease(&slot->io_in_progress_lock);
2643 : :
2246 2644 : 0 : errno = save_errno;
4502 rhaas@postgresql.org 2645 [ # # ]: 0 : ereport(elevel,
2646 : : (errcode_for_file_access(),
2647 : : errmsg("could not rename file \"%s\" to \"%s\": %m",
2648 : : tmppath, path)));
2649 : 0 : return;
2650 : : }
2651 : :
2652 : : /*
2653 : : * Check CreateSlotOnDisk() for the reasoning of using a critical section.
2654 : : */
4502 rhaas@postgresql.org 2655 :CBC 2483 : START_CRIT_SECTION();
2656 : :
2657 : 2483 : fsync_fname(path, false);
3734 andres@anarazel.de 2658 : 2483 : fsync_fname(dir, true);
638 michael@paquier.xyz 2659 : 2483 : fsync_fname(PG_REPLSLOT_DIR, true);
2660 : :
4502 rhaas@postgresql.org 2661 [ - + ]: 2483 : END_CRIT_SECTION();
2662 : :
2663 : : /*
2664 : : * Successfully wrote, unset dirty bit, unless somebody dirtied again
2665 : : * already and remember the confirmed_flush LSN value.
2666 : : */
3889 2667 [ - + ]: 2483 : SpinLockAcquire(&slot->mutex);
2668 [ + + ]: 2483 : if (!slot->just_dirtied)
2669 : 2467 : slot->dirty = false;
989 akapila@postgresql.o 2670 : 2483 : slot->last_saved_confirmed_flush = cp.slotdata.confirmed_flush;
350 akorotkov@postgresql 2671 : 2483 : slot->last_saved_restart_lsn = cp.slotdata.restart_lsn;
3889 rhaas@postgresql.org 2672 : 2483 : SpinLockRelease(&slot->mutex);
2673 : :
3774 2674 : 2483 : LWLockRelease(&slot->io_in_progress_lock);
2675 : : }
2676 : :
2677 : : /*
2678 : : * Load a single slot from disk into memory.
2679 : : */
2680 : : static void
4502 2681 : 126 : RestoreSlotFromDisk(const char *name)
2682 : : {
2683 : : ReplicationSlotOnDisk cp;
2684 : : int i;
2685 : : char slotdir[MAXPGPATH + sizeof(PG_REPLSLOT_DIR)];
2686 : : char path[MAXPGPATH + sizeof(PG_REPLSLOT_DIR) + 10];
2687 : : int fd;
2688 : 126 : bool restored = false;
2689 : : int readBytes;
2690 : : pg_crc32c checksum;
479 akapila@postgresql.o 2691 : 126 : TimestampTz now = 0;
2692 : :
2693 : : /* no need to lock here, no concurrent access allowed yet */
2694 : :
2695 : : /* delete temp file if it exists */
638 michael@paquier.xyz 2696 : 126 : sprintf(slotdir, "%s/%s", PG_REPLSLOT_DIR, name);
2827 2697 : 126 : sprintf(path, "%s/state.tmp", slotdir);
4502 rhaas@postgresql.org 2698 [ + - - + ]: 126 : if (unlink(path) < 0 && errno != ENOENT)
4502 rhaas@postgresql.org 2699 [ # # ]:UBC 0 : ereport(PANIC,
2700 : : (errcode_for_file_access(),
2701 : : errmsg("could not remove file \"%s\": %m", path)));
2702 : :
2827 michael@paquier.xyz 2703 :CBC 126 : sprintf(path, "%s/state", slotdir);
2704 : :
4502 rhaas@postgresql.org 2705 [ + + ]: 126 : elog(DEBUG1, "restoring replication slot from \"%s\"", path);
2706 : :
2707 : : /* on some operating systems fsyncing a file requires O_RDWR */
2430 andres@anarazel.de 2708 : 126 : fd = OpenTransientFile(path, O_RDWR | PG_BINARY);
2709 : :
2710 : : /*
2711 : : * We do not need to handle this as we are rename()ing the directory into
2712 : : * place only after we fsync()ed the state file.
2713 : : */
4502 rhaas@postgresql.org 2714 [ - + ]: 126 : if (fd < 0)
4502 rhaas@postgresql.org 2715 [ # # ]:UBC 0 : ereport(PANIC,
2716 : : (errcode_for_file_access(),
2717 : : errmsg("could not open file \"%s\": %m", path)));
2718 : :
2719 : : /*
2720 : : * Sync state file before we're reading from it. We might have crashed
2721 : : * while it wasn't synced yet and we shouldn't continue on that basis.
2722 : : */
3360 rhaas@postgresql.org 2723 :CBC 126 : pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_RESTORE_SYNC);
4502 2724 [ - + ]: 126 : if (pg_fsync(fd) != 0)
4502 rhaas@postgresql.org 2725 [ # # ]:UBC 0 : ereport(PANIC,
2726 : : (errcode_for_file_access(),
2727 : : errmsg("could not fsync file \"%s\": %m",
2728 : : path)));
3360 rhaas@postgresql.org 2729 :CBC 126 : pgstat_report_wait_end();
2730 : :
2731 : : /* Also sync the parent directory */
4502 2732 : 126 : START_CRIT_SECTION();
2827 michael@paquier.xyz 2733 : 126 : fsync_fname(slotdir, true);
4502 rhaas@postgresql.org 2734 [ - + ]: 126 : END_CRIT_SECTION();
2735 : :
2736 : : /* read part of statefile that's guaranteed to be version independent */
3360 2737 : 126 : pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_READ);
4502 2738 : 126 : readBytes = read(fd, &cp, ReplicationSlotOnDiskConstantSize);
3360 2739 : 126 : pgstat_report_wait_end();
4502 2740 [ - + ]: 126 : if (readBytes != ReplicationSlotOnDiskConstantSize)
2741 : : {
2873 michael@paquier.xyz 2742 [ # # ]:UBC 0 : if (readBytes < 0)
2743 [ # # ]: 0 : ereport(PANIC,
2744 : : (errcode_for_file_access(),
2745 : : errmsg("could not read file \"%s\": %m", path)));
2746 : : else
2747 [ # # ]: 0 : ereport(PANIC,
2748 : : (errcode(ERRCODE_DATA_CORRUPTED),
2749 : : errmsg("could not read file \"%s\": read %d of %zu",
2750 : : path, readBytes,
2751 : : (Size) ReplicationSlotOnDiskConstantSize)));
2752 : : }
2753 : :
2754 : : /* verify magic */
4502 rhaas@postgresql.org 2755 [ - + ]:CBC 126 : if (cp.magic != SLOT_MAGIC)
4502 rhaas@postgresql.org 2756 [ # # ]:UBC 0 : ereport(PANIC,
2757 : : (errcode(ERRCODE_DATA_CORRUPTED),
2758 : : errmsg("replication slot file \"%s\" has wrong magic number: %u instead of %u",
2759 : : path, cp.magic, SLOT_MAGIC)));
2760 : :
2761 : : /* verify version */
4502 rhaas@postgresql.org 2762 [ - + ]:CBC 126 : if (cp.version != SLOT_VERSION)
4502 rhaas@postgresql.org 2763 [ # # ]:UBC 0 : ereport(PANIC,
2764 : : (errcode(ERRCODE_DATA_CORRUPTED),
2765 : : errmsg("replication slot file \"%s\" has unsupported version %u",
2766 : : path, cp.version)));
2767 : :
2768 : : /* boundary check on length */
4217 andres@anarazel.de 2769 [ - + ]:CBC 126 : if (cp.length != ReplicationSlotOnDiskV2Size)
4502 rhaas@postgresql.org 2770 [ # # ]:UBC 0 : ereport(PANIC,
2771 : : (errcode(ERRCODE_DATA_CORRUPTED),
2772 : : errmsg("replication slot file \"%s\" has corrupted length %u",
2773 : : path, cp.length)));
2774 : :
2775 : : /* Now that we know the size, read the entire file */
3360 rhaas@postgresql.org 2776 :CBC 126 : pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_READ);
4502 2777 : 252 : readBytes = read(fd,
2778 : : (char *) &cp + ReplicationSlotOnDiskConstantSize,
2779 : 126 : cp.length);
3360 2780 : 126 : pgstat_report_wait_end();
4502 2781 [ - + ]: 126 : if (readBytes != cp.length)
2782 : : {
2873 michael@paquier.xyz 2783 [ # # ]:UBC 0 : if (readBytes < 0)
2784 [ # # ]: 0 : ereport(PANIC,
2785 : : (errcode_for_file_access(),
2786 : : errmsg("could not read file \"%s\": %m", path)));
2787 : : else
2788 [ # # ]: 0 : ereport(PANIC,
2789 : : (errcode(ERRCODE_DATA_CORRUPTED),
2790 : : errmsg("could not read file \"%s\": read %d of %zu",
2791 : : path, readBytes, (Size) cp.length)));
2792 : : }
2793 : :
2520 peter@eisentraut.org 2794 [ - + ]:CBC 126 : if (CloseTransientFile(fd) != 0)
2639 michael@paquier.xyz 2795 [ # # ]:UBC 0 : ereport(PANIC,
2796 : : (errcode_for_file_access(),
2797 : : errmsg("could not close file \"%s\": %m", path)));
2798 : :
2799 : : /* now verify the CRC */
4225 heikki.linnakangas@i 2800 :CBC 126 : INIT_CRC32C(checksum);
2801 : 126 : COMP_CRC32C(checksum,
2802 : : (char *) &cp + ReplicationSlotOnDiskNotChecksummedSize,
2803 : : ReplicationSlotOnDiskChecksummedSize);
4217 andres@anarazel.de 2804 : 126 : FIN_CRC32C(checksum);
2805 : :
4225 heikki.linnakangas@i 2806 [ - + ]: 126 : if (!EQ_CRC32C(checksum, cp.checksum))
4502 rhaas@postgresql.org 2807 [ # # ]:UBC 0 : ereport(PANIC,
2808 : : (errmsg("checksum mismatch for replication slot file \"%s\": is %u, should be %u",
2809 : : path, checksum, cp.checksum)));
2810 : :
2811 : : /*
2812 : : * If we crashed with an ephemeral slot active, don't restore but delete
2813 : : * it.
2814 : : */
4328 andres@anarazel.de 2815 [ - + ]:CBC 126 : if (cp.slotdata.persistency != RS_PERSISTENT)
2816 : : {
2827 michael@paquier.xyz 2817 [ # # ]:UBC 0 : if (!rmtree(slotdir, true))
2818 : : {
4328 andres@anarazel.de 2819 [ # # ]: 0 : ereport(WARNING,
2820 : : (errmsg("could not remove directory \"%s\"",
2821 : : slotdir)));
2822 : : }
638 michael@paquier.xyz 2823 : 0 : fsync_fname(PG_REPLSLOT_DIR, true);
4328 andres@anarazel.de 2824 : 0 : return;
2825 : : }
2826 : :
2827 : : /*
2828 : : * Verify that requirements for the specific slot type are met. That's
2829 : : * important because if these aren't met we're not guaranteed to retain
2830 : : * all the necessary resources for the slot.
2831 : : *
2832 : : * NB: We have to do so *after* the above checks for ephemeral slots,
2833 : : * because otherwise a slot that shouldn't exist anymore could prevent
2834 : : * restarts.
2835 : : *
2836 : : * NB: Changing the requirements here also requires adapting
2837 : : * CheckSlotRequirements() and CheckLogicalDecodingRequirements().
2838 : : */
460 msawada@postgresql.o 2839 [ + + ]:CBC 126 : if (cp.slotdata.database != InvalidOid)
2840 : : {
158 msawada@postgresql.o 2841 [ + + ]:GNC 87 : if (wal_level < WAL_LEVEL_REPLICA)
460 msawada@postgresql.o 2842 [ + - ]:GBC 1 : ereport(FATAL,
2843 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2844 : : errmsg("logical replication slot \"%s\" exists, but \"wal_level\" < \"replica\"",
2845 : : NameStr(cp.slotdata.name)),
2846 : : errhint("Change \"wal_level\" to be \"replica\" or higher.")));
2847 : :
2848 : : /*
2849 : : * In standby mode, the hot standby must be enabled. This check is
2850 : : * necessary to ensure logical slots are invalidated when they become
2851 : : * incompatible due to insufficient wal_level. Otherwise, if the
2852 : : * primary reduces effective_wal_level < logical while hot standby is
2853 : : * disabled, primary disable logical decoding while hot standby is
2854 : : * disabled, logical slots would remain valid even after promotion.
2855 : : */
460 msawada@postgresql.o 2856 [ + + + + ]:CBC 86 : if (StandbyMode && !EnableHotStandby)
2857 [ + - ]: 1 : ereport(FATAL,
2858 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2859 : : errmsg("logical replication slot \"%s\" exists on the standby, but \"hot_standby\" = \"off\"",
2860 : : NameStr(cp.slotdata.name)),
2861 : : errhint("Change \"hot_standby\" to be \"on\".")));
2862 : : }
2768 andres@anarazel.de 2863 [ - + ]: 39 : else if (wal_level < WAL_LEVEL_REPLICA)
2768 andres@anarazel.de 2864 [ # # ]:UBC 0 : ereport(FATAL,
2865 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2866 : : errmsg("physical replication slot \"%s\" exists, but \"wal_level\" < \"replica\"",
2867 : : NameStr(cp.slotdata.name)),
2868 : : errhint("Change \"wal_level\" to be \"replica\" or higher.")));
2869 : :
2870 : : /*
2871 : : * Nothing can be active yet, don't lock anything. Note we iterate up to
2872 : : * max_replication_slots instead of adding max_repack_replication_slots as
2873 : : * in all other places, because we must enforce the GUC value in case
2874 : : * there were more slots before the shutdown than what it is set up to
2875 : : * now.
2876 : : */
4502 rhaas@postgresql.org 2877 [ + - ]:CBC 182 : for (i = 0; i < max_replication_slots; i++)
2878 : : {
2879 : : ReplicationSlot *slot;
2880 : :
2881 : 182 : slot = &ReplicationSlotCtl->replication_slots[i];
2882 : :
2883 [ + + ]: 182 : if (slot->in_use)
2884 : 58 : continue;
2885 : :
2886 : : /* restore the entire set of persistent data */
2887 : 124 : memcpy(&slot->data, &cp.slotdata,
2888 : : sizeof(ReplicationSlotPersistentData));
2889 : :
2890 : : /* initialize in memory state */
2891 : 124 : slot->effective_xmin = cp.slotdata.xmin;
4471 2892 : 124 : slot->effective_catalog_xmin = cp.slotdata.catalog_xmin;
989 akapila@postgresql.o 2893 : 124 : slot->last_saved_confirmed_flush = cp.slotdata.confirmed_flush;
350 akorotkov@postgresql 2894 : 124 : slot->last_saved_restart_lsn = cp.slotdata.restart_lsn;
2895 : :
4471 rhaas@postgresql.org 2896 : 124 : slot->candidate_catalog_xmin = InvalidTransactionId;
2897 : 124 : slot->candidate_xmin_lsn = InvalidXLogRecPtr;
2898 : 124 : slot->candidate_restart_lsn = InvalidXLogRecPtr;
2899 : 124 : slot->candidate_restart_valid = InvalidXLogRecPtr;
2900 : :
4502 2901 : 124 : slot->in_use = true;
109 heikki.linnakangas@i 2902 :GNC 124 : slot->active_proc = INVALID_PROC_NUMBER;
2903 : :
2904 : : /*
2905 : : * Set the time since the slot has become inactive after loading the
2906 : : * slot from the disk into memory. Whoever acquires the slot i.e.
2907 : : * makes the slot active will reset it. Use the same inactive_since
2908 : : * time for all the slots.
2909 : : */
479 akapila@postgresql.o 2910 [ + - ]:CBC 124 : if (now == 0)
2911 : 124 : now = GetCurrentTimestamp();
2912 : :
2913 : 124 : ReplicationSlotSetInactiveSince(slot, now, false);
2914 : :
4502 rhaas@postgresql.org 2915 : 124 : restored = true;
2916 : 124 : break;
2917 : : }
2918 : :
2919 [ - + ]: 124 : if (!restored)
2766 michael@paquier.xyz 2920 [ # # ]:UBC 0 : ereport(FATAL,
2921 : : (errmsg("too many replication slots active before shutdown"),
2922 : : errhint("Increase \"max_replication_slots\" and try again.")));
2923 : : }
2924 : :
2925 : : /*
2926 : : * Maps an invalidation reason for a replication slot to
2927 : : * ReplicationSlotInvalidationCause.
2928 : : */
2929 : : ReplicationSlotInvalidationCause
465 akapila@postgresql.o 2930 : 0 : GetSlotInvalidationCause(const char *cause_name)
2931 : : {
2932 [ # # ]: 0 : Assert(cause_name);
2933 : :
2934 : : /* Search lookup table for the cause having this name */
2935 [ # # ]: 0 : for (int i = 0; i <= RS_INVAL_MAX_CAUSES; i++)
2936 : : {
2937 [ # # ]: 0 : if (strcmp(SlotInvalidationCauses[i].cause_name, cause_name) == 0)
2938 : 0 : return SlotInvalidationCauses[i].cause;
2939 : : }
2940 : :
2941 : 0 : Assert(false);
2942 : : return RS_INVAL_NONE; /* to keep compiler quiet */
2943 : : }
2944 : :
2945 : : /*
2946 : : * Maps a ReplicationSlotInvalidationCause to the invalidation
2947 : : * reason for a replication slot.
2948 : : */
2949 : : const char *
465 akapila@postgresql.o 2950 :CBC 44 : GetSlotInvalidationCauseName(ReplicationSlotInvalidationCause cause)
2951 : : {
2952 : : /* Search lookup table for the name of this cause */
2953 [ + - ]: 141 : for (int i = 0; i <= RS_INVAL_MAX_CAUSES; i++)
2954 : : {
2955 [ + + ]: 141 : if (SlotInvalidationCauses[i].cause == cause)
2956 : 44 : return SlotInvalidationCauses[i].cause_name;
2957 : : }
2958 : :
465 akapila@postgresql.o 2959 :UBC 0 : Assert(false);
2960 : : return "none"; /* to keep compiler quiet */
2961 : : }
2962 : :
2963 : : /*
2964 : : * A helper function to validate slots specified in GUC synchronized_standby_slots.
2965 : : *
2966 : : * The rawname will be parsed, and the result will be saved into *elemlist.
2967 : : */
2968 : : static bool
698 akapila@postgresql.o 2969 :CBC 28 : validate_sync_standby_slots(char *rawname, List **elemlist)
2970 : : {
2971 : : /* Verify syntax and parse string into a list of identifiers */
215 2972 [ - + ]: 28 : if (!SplitIdentifierString(rawname, ',', elemlist))
2973 : : {
813 akapila@postgresql.o 2974 :UBC 0 : GUC_check_errdetail("List syntax is invalid.");
215 2975 : 0 : return false;
2976 : : }
2977 : :
2978 : : /* Iterate the list to validate each slot name */
215 akapila@postgresql.o 2979 [ + - + + :CBC 80 : foreach_ptr(char, name, *elemlist)
+ + ]
2980 : : {
2981 : : int err_code;
2982 : 28 : char *err_msg = NULL;
2983 : 28 : char *err_hint = NULL;
2984 : :
215 akapila@postgresql.o 2985 [ + + ]:GNC 28 : if (!ReplicationSlotValidateNameInternal(name, false, &err_code,
2986 : : &err_msg, &err_hint))
2987 : : {
215 akapila@postgresql.o 2988 :CBC 2 : GUC_check_errcode(err_code);
2989 : 2 : GUC_check_errdetail("%s", err_msg);
2990 [ + - ]: 2 : if (err_hint != NULL)
2991 : 2 : GUC_check_errhint("%s", err_hint);
2992 : 2 : return false;
2993 : : }
2994 : : }
2995 : :
2996 : 26 : return true;
2997 : : }
2998 : :
2999 : : /*
3000 : : * GUC check_hook for synchronized_standby_slots
3001 : : */
3002 : : bool
698 3003 : 1333 : check_synchronized_standby_slots(char **newval, void **extra, GucSource source)
3004 : : {
3005 : : char *rawname;
3006 : : char *ptr;
3007 : : List *elemlist;
3008 : : int size;
3009 : : bool ok;
3010 : : SyncStandbySlotsConfigData *config;
3011 : :
813 3012 [ + + ]: 1333 : if ((*newval)[0] == '\0')
3013 : 1305 : return true;
3014 : :
3015 : : /* Need a modifiable copy of the GUC string */
3016 : 28 : rawname = pstrdup(*newval);
3017 : :
3018 : : /* Now verify if the specified slots exist and have correct type */
698 3019 : 28 : ok = validate_sync_standby_slots(rawname, &elemlist);
3020 : :
813 3021 [ + + - + ]: 28 : if (!ok || elemlist == NIL)
3022 : : {
3023 : 2 : pfree(rawname);
3024 : 2 : list_free(elemlist);
3025 : 2 : return ok;
3026 : : }
3027 : :
3028 : : /* Compute the size required for the SyncStandbySlotsConfigData struct */
698 3029 : 26 : size = offsetof(SyncStandbySlotsConfigData, slot_names);
813 3030 [ + - + + : 78 : foreach_ptr(char, slot_name, elemlist)
+ + ]
3031 : 26 : size += strlen(slot_name) + 1;
3032 : :
3033 : : /* GUC extra value must be guc_malloc'd, not palloc'd */
698 3034 : 26 : config = (SyncStandbySlotsConfigData *) guc_malloc(LOG, size);
429 dgustafsson@postgres 3035 [ - + ]: 26 : if (!config)
429 dgustafsson@postgres 3036 :UBC 0 : return false;
3037 : :
3038 : : /* Transform the data into SyncStandbySlotsConfigData */
813 akapila@postgresql.o 3039 :CBC 26 : config->nslotnames = list_length(elemlist);
3040 : :
3041 : 26 : ptr = config->slot_names;
3042 [ + - + + : 78 : foreach_ptr(char, slot_name, elemlist)
+ + ]
3043 : : {
3044 : 26 : strcpy(ptr, slot_name);
3045 : 26 : ptr += strlen(slot_name) + 1;
3046 : : }
3047 : :
548 peter@eisentraut.org 3048 : 26 : *extra = config;
3049 : :
813 akapila@postgresql.o 3050 : 26 : pfree(rawname);
3051 : 26 : list_free(elemlist);
3052 : 26 : return true;
3053 : : }
3054 : :
3055 : : /*
3056 : : * GUC assign_hook for synchronized_standby_slots
3057 : : */
3058 : : void
698 3059 : 1330 : assign_synchronized_standby_slots(const char *newval, void *extra)
3060 : : {
3061 : : /*
3062 : : * The standby slots may have changed, so we must recompute the oldest
3063 : : * LSN.
3064 : : */
813 3065 : 1330 : ss_oldest_flush_lsn = InvalidXLogRecPtr;
3066 : :
698 3067 : 1330 : synchronized_standby_slots_config = (SyncStandbySlotsConfigData *) extra;
813 3068 : 1330 : }
3069 : :
3070 : : /*
3071 : : * Check if the passed slot_name is specified in the synchronized_standby_slots GUC.
3072 : : */
3073 : : bool
698 3074 : 70374 : SlotExistsInSyncStandbySlots(const char *slot_name)
3075 : : {
3076 : : const char *standby_slot_name;
3077 : :
3078 : : /* Return false if there is no value in synchronized_standby_slots */
3079 [ + + ]: 70374 : if (synchronized_standby_slots_config == NULL)
813 3080 : 70358 : return false;
3081 : :
3082 : : /*
3083 : : * XXX: We are not expecting this list to be long so a linear search
3084 : : * shouldn't hurt but if that turns out not to be true then we can cache
3085 : : * this information for each WalSender as well.
3086 : : */
698 3087 : 16 : standby_slot_name = synchronized_standby_slots_config->slot_names;
3088 [ + + ]: 24 : for (int i = 0; i < synchronized_standby_slots_config->nslotnames; i++)
3089 : : {
813 3090 [ + + ]: 16 : if (strcmp(standby_slot_name, slot_name) == 0)
3091 : 8 : return true;
3092 : :
813 akapila@postgresql.o 3093 :GBC 8 : standby_slot_name += strlen(standby_slot_name) + 1;
3094 : : }
3095 : :
3096 : 8 : return false;
3097 : : }
3098 : :
3099 : : /*
3100 : : * Return true if the slots specified in synchronized_standby_slots have caught up to
3101 : : * the given WAL location, false otherwise.
3102 : : *
3103 : : * The elevel parameter specifies the error level used for logging messages
3104 : : * related to slots that do not exist, are invalidated, or are inactive.
3105 : : */
3106 : : bool
813 akapila@postgresql.o 3107 :CBC 2157 : StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
3108 : : {
3109 : : const char *name;
3110 : 2157 : int caught_up_slot_num = 0;
3111 : 2157 : XLogRecPtr min_restart_lsn = InvalidXLogRecPtr;
3112 : :
3113 : : /*
3114 : : * Don't need to wait for the standbys to catch up if there is no value in
3115 : : * synchronized_standby_slots.
3116 : : */
698 3117 [ + + ]: 2157 : if (synchronized_standby_slots_config == NULL)
813 3118 : 2128 : return true;
3119 : :
3120 : : /*
3121 : : * Don't need to wait for the standbys to catch up if we are on a standby
3122 : : * server, since we do not support syncing slots to cascading standbys.
3123 : : */
3124 [ - + ]: 29 : if (RecoveryInProgress())
813 akapila@postgresql.o 3125 :UBC 0 : return true;
3126 : :
3127 : : /*
3128 : : * Don't need to wait for the standbys to catch up if they are already
3129 : : * beyond the specified WAL location.
3130 : : */
205 alvherre@kurilemu.de 3131 [ + + ]:GNC 29 : if (XLogRecPtrIsValid(ss_oldest_flush_lsn) &&
813 akapila@postgresql.o 3132 [ + + ]:CBC 18 : ss_oldest_flush_lsn >= wait_for_lsn)
3133 : 9 : return true;
3134 : :
3135 : : /*
3136 : : * To prevent concurrent slot dropping and creation while filtering the
3137 : : * slots, take the ReplicationSlotControlLock outside of the loop.
3138 : : */
3139 : 20 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
3140 : :
698 3141 : 20 : name = synchronized_standby_slots_config->slot_names;
3142 [ + + ]: 27 : for (int i = 0; i < synchronized_standby_slots_config->nslotnames; i++)
3143 : : {
3144 : : XLogRecPtr restart_lsn;
3145 : : bool invalidated;
3146 : : bool inactive;
3147 : : ReplicationSlot *slot;
3148 : :
813 3149 : 20 : slot = SearchNamedReplicationSlot(name, false);
3150 : :
3151 : : /*
3152 : : * If a slot name provided in synchronized_standby_slots does not
3153 : : * exist, report a message and exit the loop.
3154 : : */
3155 [ - + ]: 20 : if (!slot)
3156 : : {
813 akapila@postgresql.o 3157 [ # # ]:UBC 0 : ereport(elevel,
3158 : : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
3159 : : errmsg("replication slot \"%s\" specified in parameter \"%s\" does not exist",
3160 : : name, "synchronized_standby_slots"),
3161 : : errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
3162 : : name),
3163 : : errhint("Create the replication slot \"%s\" or amend parameter \"%s\".",
3164 : : name, "synchronized_standby_slots"));
3165 : 0 : break;
3166 : : }
3167 : :
3168 : : /* Same as above: if a slot is not physical, exit the loop. */
813 akapila@postgresql.o 3169 [ - + ]:CBC 20 : if (SlotIsLogical(slot))
3170 : : {
813 akapila@postgresql.o 3171 [ # # ]:UBC 0 : ereport(elevel,
3172 : : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
3173 : : errmsg("cannot specify logical replication slot \"%s\" in parameter \"%s\"",
3174 : : name, "synchronized_standby_slots"),
3175 : : errdetail("Logical replication is waiting for correction on replication slot \"%s\".",
3176 : : name),
3177 : : errhint("Remove the logical replication slot \"%s\" from parameter \"%s\".",
3178 : : name, "synchronized_standby_slots"));
3179 : 0 : break;
3180 : : }
3181 : :
813 akapila@postgresql.o 3182 [ - + ]:CBC 20 : SpinLockAcquire(&slot->mutex);
3183 : 20 : restart_lsn = slot->data.restart_lsn;
3184 : 20 : invalidated = slot->data.invalidated != RS_INVAL_NONE;
109 heikki.linnakangas@i 3185 :GNC 20 : inactive = slot->active_proc == INVALID_PROC_NUMBER;
813 akapila@postgresql.o 3186 :CBC 20 : SpinLockRelease(&slot->mutex);
3187 : :
3188 [ - + ]: 20 : if (invalidated)
3189 : : {
3190 : : /* Specified physical slot has been invalidated */
813 akapila@postgresql.o 3191 [ # # ]:UBC 0 : ereport(elevel,
3192 : : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
3193 : : errmsg("physical replication slot \"%s\" specified in parameter \"%s\" has been invalidated",
3194 : : name, "synchronized_standby_slots"),
3195 : : errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
3196 : : name),
3197 : : errhint("Drop and recreate the replication slot \"%s\", or amend parameter \"%s\".",
3198 : : name, "synchronized_standby_slots"));
3199 : 0 : break;
3200 : : }
3201 : :
205 alvherre@kurilemu.de 3202 [ + + + + ]:GNC 20 : if (!XLogRecPtrIsValid(restart_lsn) || restart_lsn < wait_for_lsn)
3203 : : {
3204 : : /* Log a message if no active_pid for this physical slot */
813 akapila@postgresql.o 3205 [ + + ]:CBC 13 : if (inactive)
3206 [ + - ]: 8 : ereport(elevel,
3207 : : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
3208 : : errmsg("replication slot \"%s\" specified in parameter \"%s\" does not have active_pid",
3209 : : name, "synchronized_standby_slots"),
3210 : : errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
3211 : : name),
3212 : : errhint("Start the standby associated with the replication slot \"%s\", or amend parameter \"%s\".",
3213 : : name, "synchronized_standby_slots"));
3214 : :
3215 : : /* Continue if the current slot hasn't caught up. */
3216 : 12 : break;
3217 : : }
3218 : :
3219 [ - + ]: 7 : Assert(restart_lsn >= wait_for_lsn);
3220 : :
205 alvherre@kurilemu.de 3221 [ - + - - ]:GNC 7 : if (!XLogRecPtrIsValid(min_restart_lsn) ||
3222 : : min_restart_lsn > restart_lsn)
813 akapila@postgresql.o 3223 :CBC 7 : min_restart_lsn = restart_lsn;
3224 : :
3225 : 7 : caught_up_slot_num++;
3226 : :
3227 : 7 : name += strlen(name) + 1;
3228 : : }
3229 : :
3230 : 19 : LWLockRelease(ReplicationSlotControlLock);
3231 : :
3232 : : /*
3233 : : * Return false if not all the standbys have caught up to the specified
3234 : : * WAL location.
3235 : : */
698 3236 [ + + ]: 19 : if (caught_up_slot_num != synchronized_standby_slots_config->nslotnames)
813 3237 : 12 : return false;
3238 : :
3239 : : /* The ss_oldest_flush_lsn must not retreat. */
205 alvherre@kurilemu.de 3240 [ + + - + ]:GNC 7 : Assert(!XLogRecPtrIsValid(ss_oldest_flush_lsn) ||
3241 : : min_restart_lsn >= ss_oldest_flush_lsn);
3242 : :
813 akapila@postgresql.o 3243 :CBC 7 : ss_oldest_flush_lsn = min_restart_lsn;
3244 : :
3245 : 7 : return true;
3246 : : }
3247 : :
3248 : : /*
3249 : : * Wait for physical standbys to confirm receiving the given lsn.
3250 : : *
3251 : : * Used by logical decoding SQL functions. It waits for physical standbys
3252 : : * corresponding to the physical slots specified in the synchronized_standby_slots GUC.
3253 : : */
3254 : : void
3255 : 235 : WaitForStandbyConfirmation(XLogRecPtr wait_for_lsn)
3256 : : {
3257 : : /*
3258 : : * Don't need to wait for the standby to catch up if the current acquired
3259 : : * slot is not a logical failover slot, or there is no value in
3260 : : * synchronized_standby_slots.
3261 : : */
698 3262 [ + + + + ]: 235 : if (!MyReplicationSlot->data.failover || !synchronized_standby_slots_config)
813 3263 : 234 : return;
3264 : :
3265 : 1 : ConditionVariablePrepareToSleep(&WalSndCtl->wal_confirm_rcv_cv);
3266 : :
3267 : : for (;;)
3268 : : {
3269 [ - + ]: 2 : CHECK_FOR_INTERRUPTS();
3270 : :
3271 [ + + ]: 2 : if (ConfigReloadPending)
3272 : : {
3273 : 1 : ConfigReloadPending = false;
3274 : 1 : ProcessConfigFile(PGC_SIGHUP);
3275 : : }
3276 : :
3277 : : /* Exit if done waiting for every slot. */
3278 [ + + ]: 2 : if (StandbySlotsHaveCaughtup(wait_for_lsn, WARNING))
3279 : 1 : break;
3280 : :
3281 : : /*
3282 : : * Wait for the slots in the synchronized_standby_slots to catch up,
3283 : : * but use a timeout (1s) so we can also check if the
3284 : : * synchronized_standby_slots has been changed.
3285 : : */
3286 : 1 : ConditionVariableTimedSleep(&WalSndCtl->wal_confirm_rcv_cv, 1000,
3287 : : WAIT_EVENT_WAIT_FOR_STANDBY_CONFIRMATION);
3288 : : }
3289 : :
3290 : 1 : ConditionVariableCancelSleep();
3291 : : }
|