Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * slot.c
4 : : * Replication slot management.
5 : : *
6 : : *
7 : : * Copyright (c) 2012-2026, PostgreSQL Global Development Group
8 : : *
9 : : *
10 : : * IDENTIFICATION
11 : : * src/backend/replication/slot.c
12 : : *
13 : : * NOTES
14 : : *
15 : : * Replication slots are used to keep state about replication streams
16 : : * originating from this cluster. Their primary purpose is to prevent the
17 : : * premature removal of WAL or of old tuple versions in a manner that would
18 : : * interfere with replication; they are also useful for monitoring purposes.
19 : : * Slots need to be permanent (to allow restarts), crash-safe, and allocatable
20 : : * on standbys (to support cascading setups). The requirement that slots be
21 : : * usable on standbys precludes storing them in the system catalogs.
22 : : *
23 : : * Each replication slot gets its own directory inside the directory
24 : : * $PGDATA / PG_REPLSLOT_DIR. Inside that directory the state file will
25 : : * contain the slot's own data. Additional data can be stored alongside that
26 : : * file if required. While the server is running, the state data is also
27 : : * cached in memory for efficiency.
28 : : *
29 : : * ReplicationSlotAllocationLock must be taken in exclusive mode to allocate
30 : : * or free a slot. ReplicationSlotControlLock must be taken in shared mode
31 : : * to iterate over the slots, and in exclusive mode to change the in_use flag
32 : : * of a slot. The remaining data in each slot is protected by its mutex.
33 : : *
34 : : *-------------------------------------------------------------------------
35 : : */
36 : :
37 : : #include "postgres.h"
38 : :
39 : : #include <unistd.h>
40 : : #include <sys/stat.h>
41 : :
42 : : #include "access/transam.h"
43 : : #include "access/xlog_internal.h"
44 : : #include "access/xlogrecovery.h"
45 : : #include "common/file_utils.h"
46 : : #include "common/string.h"
47 : : #include "miscadmin.h"
48 : : #include "pgstat.h"
49 : : #include "postmaster/interrupt.h"
50 : : #include "replication/logicallauncher.h"
51 : : #include "replication/slotsync.h"
52 : : #include "replication/slot.h"
53 : : #include "replication/walsender_private.h"
54 : : #include "storage/fd.h"
55 : : #include "storage/ipc.h"
56 : : #include "storage/proc.h"
57 : : #include "storage/procarray.h"
58 : : #include "utils/builtins.h"
59 : : #include "utils/guc_hooks.h"
60 : : #include "utils/injection_point.h"
61 : : #include "utils/varlena.h"
62 : : #include "utils/wait_event.h"
63 : :
64 : : /*
65 : : * Replication slot on-disk data structure.
66 : : */
67 : : typedef struct ReplicationSlotOnDisk
68 : : {
69 : : /* first part of this struct needs to be version independent */
70 : :
71 : : /* data not covered by checksum */
72 : : uint32 magic;
73 : : pg_crc32c checksum;
74 : :
75 : : /* data covered by checksum */
76 : : uint32 version;
77 : : uint32 length;
78 : :
79 : : /*
80 : : * The actual data in the slot that follows can differ based on the above
81 : : * 'version'.
82 : : */
83 : :
84 : : ReplicationSlotPersistentData slotdata;
85 : : } ReplicationSlotOnDisk;
86 : :
87 : : /*
88 : : * Struct for the configuration of synchronized_standby_slots.
89 : : *
90 : : * Note: this must be a flat representation that can be held in a single chunk
91 : : * of guc_malloc'd memory, so that it can be stored as the "extra" data for the
92 : : * synchronized_standby_slots GUC.
93 : : */
94 : : typedef struct
95 : : {
96 : : /* Number of slot names in the slot_names[] */
97 : : int nslotnames;
98 : :
99 : : /*
100 : : * slot_names contains 'nslotnames' consecutive null-terminated C strings.
101 : : */
102 : : char slot_names[FLEXIBLE_ARRAY_MEMBER];
103 : : } SyncStandbySlotsConfigData;
104 : :
105 : : /*
106 : : * Lookup table for slot invalidation causes.
107 : : */
108 : : typedef struct SlotInvalidationCauseMap
109 : : {
110 : : ReplicationSlotInvalidationCause cause;
111 : : const char *cause_name;
112 : : } SlotInvalidationCauseMap;
113 : :
114 : : static const SlotInvalidationCauseMap SlotInvalidationCauses[] = {
115 : : {RS_INVAL_NONE, "none"},
116 : : {RS_INVAL_WAL_REMOVED, "wal_removed"},
117 : : {RS_INVAL_HORIZON, "rows_removed"},
118 : : {RS_INVAL_WAL_LEVEL, "wal_level_insufficient"},
119 : : {RS_INVAL_IDLE_TIMEOUT, "idle_timeout"},
120 : : };
121 : :
122 : : /*
123 : : * Ensure that the lookup table is up-to-date with the enums defined in
124 : : * ReplicationSlotInvalidationCause.
125 : : */
126 : : StaticAssertDecl(lengthof(SlotInvalidationCauses) == (RS_INVAL_MAX_CAUSES + 1),
127 : : "array length mismatch");
128 : :
129 : : /* size of version independent data */
130 : : #define ReplicationSlotOnDiskConstantSize \
131 : : offsetof(ReplicationSlotOnDisk, slotdata)
132 : : /* size of the part of the slot not covered by the checksum */
133 : : #define ReplicationSlotOnDiskNotChecksummedSize \
134 : : offsetof(ReplicationSlotOnDisk, version)
135 : : /* size of the part covered by the checksum */
136 : : #define ReplicationSlotOnDiskChecksummedSize \
137 : : sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskNotChecksummedSize
138 : : /* size of the slot data that is version dependent */
139 : : #define ReplicationSlotOnDiskV2Size \
140 : : sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize
141 : :
142 : : #define SLOT_MAGIC 0x1051CA1 /* format identifier */
143 : : #define SLOT_VERSION 5 /* version for new files */
144 : :
145 : : /* Control array for replication slot management */
146 : : ReplicationSlotCtlData *ReplicationSlotCtl = NULL;
147 : :
148 : : /* My backend's replication slot in the shared memory array */
149 : : ReplicationSlot *MyReplicationSlot = NULL;
150 : :
151 : : /* GUC variables */
152 : : int max_replication_slots = 10; /* the maximum number of replication
153 : : * slots */
154 : :
155 : : /*
156 : : * Invalidate replication slots that have remained idle longer than this
157 : : * duration; '0' disables it.
158 : : */
159 : : int idle_replication_slot_timeout_secs = 0;
160 : :
161 : : /*
162 : : * This GUC lists streaming replication standby server slot names that
163 : : * logical WAL sender processes will wait for.
164 : : */
165 : : char *synchronized_standby_slots;
166 : :
167 : : /* This is the parsed and cached configuration for synchronized_standby_slots */
168 : : static SyncStandbySlotsConfigData *synchronized_standby_slots_config;
169 : :
170 : : /*
171 : : * Oldest LSN that has been confirmed to be flushed to the standbys
172 : : * corresponding to the physical slots specified in the synchronized_standby_slots GUC.
173 : : */
174 : : static XLogRecPtr ss_oldest_flush_lsn = InvalidXLogRecPtr;
175 : :
176 : : static void ReplicationSlotShmemExit(int code, Datum arg);
177 : : static bool IsSlotForConflictCheck(const char *name);
178 : : static void ReplicationSlotDropPtr(ReplicationSlot *slot);
179 : :
180 : : /* internal persistency functions */
181 : : static void RestoreSlotFromDisk(const char *name);
182 : : static void CreateSlotOnDisk(ReplicationSlot *slot);
183 : : static void SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel);
184 : :
185 : : /*
186 : : * Report shared-memory space needed by ReplicationSlotsShmemInit.
187 : : */
188 : : Size
4426 rhaas@postgresql.org 189 :CBC 4445 : ReplicationSlotsShmemSize(void)
190 : : {
191 : 4445 : Size size = 0;
192 : :
193 [ + + ]: 4445 : if (max_replication_slots == 0)
4426 rhaas@postgresql.org 194 :GBC 2 : return size;
195 : :
4426 rhaas@postgresql.org 196 :CBC 4443 : size = offsetof(ReplicationSlotCtlData, replication_slots);
197 : 4443 : size = add_size(size,
198 : : mul_size(max_replication_slots, sizeof(ReplicationSlot)));
199 : :
200 : 4443 : return size;
201 : : }
202 : :
203 : : /*
204 : : * Allocate and initialize shared memory for replication slots.
205 : : */
206 : : void
207 : 1150 : ReplicationSlotsShmemInit(void)
208 : : {
209 : : bool found;
210 : :
211 [ + + ]: 1150 : if (max_replication_slots == 0)
4426 rhaas@postgresql.org 212 :GBC 1 : return;
213 : :
4426 rhaas@postgresql.org 214 :CBC 1149 : ReplicationSlotCtl = (ReplicationSlotCtlData *)
215 : 1149 : ShmemInitStruct("ReplicationSlot Ctl", ReplicationSlotsShmemSize(),
216 : : &found);
217 : :
218 [ + - ]: 1149 : if (!found)
219 : : {
220 : : int i;
221 : :
222 : : /* First time through, so initialize */
223 [ + - + - : 2259 : MemSet(ReplicationSlotCtl, 0, ReplicationSlotsShmemSize());
+ - + + +
+ ]
224 : :
225 [ + + ]: 12447 : for (i = 0; i < max_replication_slots; i++)
226 : : {
227 : 11298 : ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[i];
228 : :
229 : : /* everything else is zeroed by the memset above */
33 heikki.linnakangas@i 230 :GNC 11298 : slot->active_proc = INVALID_PROC_NUMBER;
4426 rhaas@postgresql.org 231 :CBC 11298 : SpinLockInit(&slot->mutex);
2130 tgl@sss.pgh.pa.us 232 : 11298 : LWLockInitialize(&slot->io_in_progress_lock,
233 : : LWTRANCHE_REPLICATION_SLOT_IO);
3155 alvherre@alvh.no-ip. 234 : 11298 : ConditionVariableInit(&slot->active_cv);
235 : : }
236 : : }
237 : : }
238 : :
239 : : /*
240 : : * Register the callback for replication slot cleanup and releasing.
241 : : */
242 : : void
1490 andres@anarazel.de 243 : 21541 : ReplicationSlotInitialize(void)
244 : : {
245 : 21541 : before_shmem_exit(ReplicationSlotShmemExit, 0);
246 : 21541 : }
247 : :
248 : : /*
249 : : * Release and cleanup replication slots.
250 : : */
251 : : static void
252 : 21541 : ReplicationSlotShmemExit(int code, Datum arg)
253 : : {
254 : : /* Make sure active replication slots are released */
255 [ + + ]: 21541 : if (MyReplicationSlot != NULL)
256 : 281 : ReplicationSlotRelease();
257 : :
258 : : /* Also cleanup all the temporary slots. */
689 akapila@postgresql.o 259 : 21541 : ReplicationSlotCleanup(false);
1490 andres@anarazel.de 260 : 21541 : }
261 : :
262 : : /*
263 : : * Check whether the passed slot name is valid and report errors at elevel.
264 : : *
265 : : * See comments for ReplicationSlotValidateNameInternal().
266 : : */
267 : : bool
144 fujii@postgresql.org 268 :GNC 839 : ReplicationSlotValidateName(const char *name, bool allow_reserved_name,
269 : : int elevel)
270 : : {
271 : : int err_code;
144 fujii@postgresql.org 272 :CBC 839 : char *err_msg = NULL;
273 : 839 : char *err_hint = NULL;
274 : :
144 fujii@postgresql.org 275 [ + + ]:GNC 839 : if (!ReplicationSlotValidateNameInternal(name, allow_reserved_name,
276 : : &err_code, &err_msg, &err_hint))
277 : : {
278 : : /*
279 : : * Use errmsg_internal() and errhint_internal() instead of errmsg()
280 : : * and errhint(), since the messages from
281 : : * ReplicationSlotValidateNameInternal() are already translated. This
282 : : * avoids double translation.
283 : : */
144 fujii@postgresql.org 284 [ + - + + ]:CBC 4 : ereport(elevel,
285 : : errcode(err_code),
286 : : errmsg_internal("%s", err_msg),
287 : : (err_hint != NULL) ? errhint_internal("%s", err_hint) : 0);
288 : :
144 fujii@postgresql.org 289 :UBC 0 : pfree(err_msg);
290 [ # # ]: 0 : if (err_hint != NULL)
291 : 0 : pfree(err_hint);
292 : 0 : return false;
293 : : }
294 : :
144 fujii@postgresql.org 295 :CBC 835 : return true;
296 : : }
297 : :
298 : : /*
299 : : * Check whether the passed slot name is valid.
300 : : *
301 : : * An error will be reported for a reserved replication slot name if
302 : : * allow_reserved_name is set to false.
303 : : *
304 : : * Slot names may consist out of [a-z0-9_]{1,NAMEDATALEN-1} which should allow
305 : : * the name to be used as a directory name on every supported OS.
306 : : *
307 : : * Returns true if the slot name is valid. Otherwise, returns false and stores
308 : : * the error code, error message, and optional hint in err_code, err_msg, and
309 : : * err_hint, respectively. The caller is responsible for freeing err_msg and
310 : : * err_hint, which are palloc'd.
311 : : */
312 : : bool
144 fujii@postgresql.org 313 :GNC 1059 : ReplicationSlotValidateNameInternal(const char *name, bool allow_reserved_name,
314 : : int *err_code, char **err_msg, char **err_hint)
315 : : {
316 : : const char *cp;
317 : :
4426 rhaas@postgresql.org 318 [ + + ]:CBC 1059 : if (strlen(name) == 0)
319 : : {
144 fujii@postgresql.org 320 : 3 : *err_code = ERRCODE_INVALID_NAME;
321 : 3 : *err_msg = psprintf(_("replication slot name \"%s\" is too short"), name);
322 : 3 : *err_hint = NULL;
4426 rhaas@postgresql.org 323 : 3 : return false;
324 : : }
325 : :
326 [ - + ]: 1056 : if (strlen(name) >= NAMEDATALEN)
327 : : {
144 fujii@postgresql.org 328 :UBC 0 : *err_code = ERRCODE_NAME_TOO_LONG;
329 : 0 : *err_msg = psprintf(_("replication slot name \"%s\" is too long"), name);
330 : 0 : *err_hint = NULL;
4426 rhaas@postgresql.org 331 : 0 : return false;
332 : : }
333 : :
4426 rhaas@postgresql.org 334 [ + + ]:CBC 20217 : for (cp = name; *cp; cp++)
335 : : {
336 [ + + - + ]: 19163 : if (!((*cp >= 'a' && *cp <= 'z')
337 [ + + + + ]: 8992 : || (*cp >= '0' && *cp <= '9')
338 [ + + ]: 1878 : || (*cp == '_')))
339 : : {
144 fujii@postgresql.org 340 : 2 : *err_code = ERRCODE_INVALID_NAME;
341 : 2 : *err_msg = psprintf(_("replication slot name \"%s\" contains invalid character"), name);
342 : 2 : *err_hint = psprintf(_("Replication slot names may only contain lower case letters, numbers, and the underscore character."));
4426 rhaas@postgresql.org 343 : 2 : return false;
344 : : }
345 : : }
346 : :
235 akapila@postgresql.o 347 [ + + + + ]:GNC 1054 : if (!allow_reserved_name && IsSlotForConflictCheck(name))
348 : : {
144 fujii@postgresql.org 349 : 1 : *err_code = ERRCODE_RESERVED_NAME;
350 : 1 : *err_msg = psprintf(_("replication slot name \"%s\" is reserved"), name);
351 : 1 : *err_hint = psprintf(_("The name \"%s\" is reserved for the conflict detection slot."),
352 : : CONFLICT_DETECTION_SLOT);
235 akapila@postgresql.o 353 : 1 : return false;
354 : : }
355 : :
4426 rhaas@postgresql.org 356 :CBC 1053 : return true;
357 : : }
358 : :
359 : : /*
360 : : * Return true if the replication slot name is "pg_conflict_detection".
361 : : */
362 : : static bool
235 akapila@postgresql.o 363 :GNC 2277 : IsSlotForConflictCheck(const char *name)
364 : : {
365 : 2277 : return (strcmp(name, CONFLICT_DETECTION_SLOT) == 0);
366 : : }
367 : :
368 : : /*
369 : : * Create a new replication slot and mark it as used by this backend.
370 : : *
371 : : * name: Name of the slot
372 : : * db_specific: logical decoding is db specific; if the slot is going to
373 : : * be used for that pass true, otherwise false.
374 : : * two_phase: If enabled, allows decoding of prepared transactions.
375 : : * failover: If enabled, allows the slot to be synced to standbys so
376 : : * that logical replication can be resumed after failover.
377 : : * synced: True if the slot is synchronized from the primary server.
378 : : */
379 : : void
4395 rhaas@postgresql.org 380 :CBC 699 : ReplicationSlotCreate(const char *name, bool db_specific,
381 : : ReplicationSlotPersistency persistency,
382 : : bool two_phase, bool failover, bool synced)
383 : : {
4426 384 : 699 : ReplicationSlot *slot = NULL;
385 : : int i;
386 : :
387 [ - + ]: 699 : Assert(MyReplicationSlot == NULL);
388 : :
389 : : /*
390 : : * The logical launcher or pg_upgrade may create or migrate an internal
391 : : * slot, so using a reserved name is allowed in these cases.
392 : : */
235 akapila@postgresql.o 393 [ + + + + ]:GNC 699 : ReplicationSlotValidateName(name, IsBinaryUpgrade || IsLogicalLauncher(),
394 : : ERROR);
395 : :
760 akapila@postgresql.o 396 [ + + ]:CBC 698 : if (failover)
397 : : {
398 : : /*
399 : : * Do not allow users to create the failover enabled slots on the
400 : : * standby as we do not support sync to the cascading standby.
401 : : *
402 : : * However, failover enabled slots can be created during slot
403 : : * synchronization because we need to retain the same values as the
404 : : * remote slot.
405 : : */
406 [ + + - + ]: 28 : if (RecoveryInProgress() && !IsSyncingReplicationSlots())
760 akapila@postgresql.o 407 [ # # ]:UBC 0 : ereport(ERROR,
408 : : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
409 : : errmsg("cannot enable failover for a replication slot created on the standby"));
410 : :
411 : : /*
412 : : * Do not allow users to create failover enabled temporary slots,
413 : : * because temporary slots will not be synced to the standby.
414 : : *
415 : : * However, failover enabled temporary slots can be created during
416 : : * slot synchronization. See the comments atop slotsync.c for details.
417 : : */
760 akapila@postgresql.o 418 [ + + + + ]:CBC 28 : if (persistency == RS_TEMPORARY && !IsSyncingReplicationSlots())
419 [ + - ]: 1 : ereport(ERROR,
420 : : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
421 : : errmsg("cannot enable failover for a temporary replication slot"));
422 : : }
423 : :
424 : : /*
425 : : * If some other backend ran this code concurrently with us, we'd likely
426 : : * both allocate the same slot, and that would be bad. We'd also be at
427 : : * risk of missing a name collision. Also, we don't want to try to create
428 : : * a new slot while somebody's busy cleaning up an old one, because we
429 : : * might both be monkeying with the same directory.
430 : : */
4426 rhaas@postgresql.org 431 : 697 : LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
432 : :
433 : : /*
434 : : * Check for name collision, and identify an allocatable slot. We need to
435 : : * hold ReplicationSlotControlLock in shared mode for this, so that nobody
436 : : * else can change the in_use flags while we're looking at them.
437 : : */
438 : 697 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
439 [ + + ]: 6759 : for (i = 0; i < max_replication_slots; i++)
440 : : {
441 : 6065 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
442 : :
443 [ + + + + ]: 6065 : if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
444 [ + - ]: 3 : ereport(ERROR,
445 : : (errcode(ERRCODE_DUPLICATE_OBJECT),
446 : : errmsg("replication slot \"%s\" already exists", name)));
447 [ + + + + ]: 6062 : if (!s->in_use && slot == NULL)
448 : 693 : slot = s;
449 : : }
450 : 694 : LWLockRelease(ReplicationSlotControlLock);
451 : :
452 : : /* If all slots are in use, we're out of luck. */
453 [ + + ]: 694 : if (slot == NULL)
454 [ + - ]: 1 : ereport(ERROR,
455 : : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
456 : : errmsg("all replication slots are in use"),
457 : : errhint("Free one or increase \"max_replication_slots\".")));
458 : :
459 : : /*
460 : : * Since this slot is not in use, nobody should be looking at any part of
461 : : * it other than the in_use field unless they're trying to allocate it.
462 : : * And since we hold ReplicationSlotAllocationLock, nobody except us can
463 : : * be doing that. So it's safe to initialize the slot.
464 : : */
465 [ - + ]: 693 : Assert(!slot->in_use);
33 heikki.linnakangas@i 466 [ - + ]:GNC 693 : Assert(slot->active_proc == INVALID_PROC_NUMBER);
467 : :
468 : : /* first initialize persistent data */
3497 andres@anarazel.de 469 :CBC 693 : memset(&slot->data, 0, sizeof(ReplicationSlotPersistentData));
2043 peter@eisentraut.org 470 : 693 : namestrcpy(&slot->data.name, name);
4426 rhaas@postgresql.org 471 [ + + ]: 693 : slot->data.database = db_specific ? MyDatabaseId : InvalidOid;
3497 andres@anarazel.de 472 : 693 : slot->data.persistency = persistency;
1838 akapila@postgresql.o 473 : 693 : slot->data.two_phase = two_phase;
1705 474 : 693 : slot->data.two_phase_at = InvalidXLogRecPtr;
780 475 : 693 : slot->data.failover = failover;
760 476 : 693 : slot->data.synced = synced;
477 : :
478 : : /* and then data only present in shared memory */
3497 andres@anarazel.de 479 : 693 : slot->just_dirtied = false;
480 : 693 : slot->dirty = false;
481 : 693 : slot->effective_xmin = InvalidTransactionId;
482 : 693 : slot->effective_catalog_xmin = InvalidTransactionId;
483 : 693 : slot->candidate_catalog_xmin = InvalidTransactionId;
484 : 693 : slot->candidate_xmin_lsn = InvalidXLogRecPtr;
485 : 693 : slot->candidate_restart_valid = InvalidXLogRecPtr;
486 : 693 : slot->candidate_restart_lsn = InvalidXLogRecPtr;
913 akapila@postgresql.o 487 : 693 : slot->last_saved_confirmed_flush = InvalidXLogRecPtr;
274 akorotkov@postgresql 488 : 693 : slot->last_saved_restart_lsn = InvalidXLogRecPtr;
718 akapila@postgresql.o 489 : 693 : slot->inactive_since = 0;
107 akapila@postgresql.o 490 :GNC 693 : slot->slotsync_skip_reason = SS_SKIP_NONE;
491 : :
492 : : /*
493 : : * Create the slot on disk. We haven't actually marked the slot allocated
494 : : * yet, so no special cleanup is required if this errors out.
495 : : */
4426 rhaas@postgresql.org 496 :CBC 693 : CreateSlotOnDisk(slot);
497 : :
498 : : /*
499 : : * We need to briefly prevent any other backend from iterating over the
500 : : * slots while we flip the in_use flag. We also need to set the active
501 : : * flag while holding the ControlLock as otherwise a concurrent
502 : : * ReplicationSlotAcquire() could acquire the slot as well.
503 : : */
504 : 693 : LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
505 : :
506 : 693 : slot->in_use = true;
507 : :
508 : : /* We can now mark the slot active, and that makes it our slot. */
3813 509 [ - + ]: 693 : SpinLockAcquire(&slot->mutex);
33 heikki.linnakangas@i 510 [ - + ]:GNC 693 : Assert(slot->active_proc == INVALID_PROC_NUMBER);
511 : 693 : slot->active_proc = MyProcNumber;
3813 rhaas@postgresql.org 512 :CBC 693 : SpinLockRelease(&slot->mutex);
513 : 693 : MyReplicationSlot = slot;
514 : :
4426 515 : 693 : LWLockRelease(ReplicationSlotControlLock);
516 : :
517 : : /*
518 : : * Create statistics entry for the new logical slot. We don't collect any
519 : : * stats for physical slots, so no need to create an entry for the same.
520 : : * See ReplicationSlotDropPtr for why we need to do this before releasing
521 : : * ReplicationSlotAllocationLock.
522 : : */
1984 akapila@postgresql.o 523 [ + + ]: 693 : if (SlotIsLogical(slot))
1439 andres@anarazel.de 524 : 500 : pgstat_create_replslot(slot);
525 : :
526 : : /*
527 : : * Now that the slot has been marked as in_use and active, it's safe to
528 : : * let somebody else try to allocate a slot.
529 : : */
4426 rhaas@postgresql.org 530 : 693 : LWLockRelease(ReplicationSlotAllocationLock);
531 : :
532 : : /* Let everybody know we've modified this slot */
3155 alvherre@alvh.no-ip. 533 : 693 : ConditionVariableBroadcast(&slot->active_cv);
4426 rhaas@postgresql.org 534 : 693 : }
535 : :
536 : : /*
537 : : * Search for the named replication slot.
538 : : *
539 : : * Return the replication slot if found, otherwise NULL.
540 : : */
541 : : ReplicationSlot *
1783 akapila@postgresql.o 542 : 2018 : SearchNamedReplicationSlot(const char *name, bool need_lock)
543 : : {
544 : : int i;
545 : 2018 : ReplicationSlot *slot = NULL;
546 : :
547 [ + + ]: 2018 : if (need_lock)
548 : 583 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
549 : :
4426 rhaas@postgresql.org 550 [ + + ]: 7719 : for (i = 0; i < max_replication_slots; i++)
551 : : {
552 : 7238 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
553 : :
554 [ + + + + ]: 7238 : if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
555 : : {
556 : 1537 : slot = s;
557 : 1537 : break;
558 : : }
559 : : }
560 : :
1783 akapila@postgresql.o 561 [ + + ]: 2018 : if (need_lock)
562 : 583 : LWLockRelease(ReplicationSlotControlLock);
563 : :
2095 fujii@postgresql.org 564 : 2018 : return slot;
565 : : }
566 : :
567 : : /*
568 : : * Return the index of the replication slot in
569 : : * ReplicationSlotCtl->replication_slots.
570 : : *
571 : : * This is mainly useful to have an efficient key for storing replication slot
572 : : * stats.
573 : : */
574 : : int
1439 andres@anarazel.de 575 : 8482 : ReplicationSlotIndex(ReplicationSlot *slot)
576 : : {
577 [ + - - + ]: 8482 : Assert(slot >= ReplicationSlotCtl->replication_slots &&
578 : : slot < ReplicationSlotCtl->replication_slots + max_replication_slots);
579 : :
580 : 8482 : return slot - ReplicationSlotCtl->replication_slots;
581 : : }
582 : :
583 : : /*
584 : : * If the slot at 'index' is unused, return false. Otherwise 'name' is set to
585 : : * the slot's name and true is returned.
586 : : *
587 : : * This likely is only useful for pgstat_replslot.c during shutdown, in other
588 : : * cases there are obvious TOCTOU issues.
589 : : */
590 : : bool
1254 591 : 107 : ReplicationSlotName(int index, Name name)
592 : : {
593 : : ReplicationSlot *slot;
594 : : bool found;
595 : :
596 : 107 : slot = &ReplicationSlotCtl->replication_slots[index];
597 : :
598 : : /*
599 : : * Ensure that the slot cannot be dropped while we copy the name. Don't
600 : : * need the spinlock as the name of an existing slot cannot change.
601 : : */
602 : 107 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
603 : 107 : found = slot->in_use;
604 [ + - ]: 107 : if (slot->in_use)
605 : 107 : namestrcpy(name, NameStr(slot->data.name));
606 : 107 : LWLockRelease(ReplicationSlotControlLock);
607 : :
608 : 107 : return found;
609 : : }
610 : :
611 : : /*
612 : : * Find a previously created slot and mark it as used by this process.
613 : : *
614 : : * An error is raised if nowait is true and the slot is currently in use. If
615 : : * nowait is false, we sleep until the slot is released by the owning process.
616 : : *
617 : : * An error is raised if error_if_invalid is true and the slot is found to
618 : : * be invalid. It should always be set to true, except when we are temporarily
619 : : * acquiring the slot and don't intend to change it.
620 : : */
621 : : void
408 akapila@postgresql.o 622 : 1357 : ReplicationSlotAcquire(const char *name, bool nowait, bool error_if_invalid)
623 : : {
624 : : ReplicationSlot *s;
625 : : ProcNumber active_proc;
626 : : int active_pid;
627 : :
1234 peter@eisentraut.org 628 [ + - ]: 1357 : Assert(name != NULL);
629 : :
2095 fujii@postgresql.org 630 : 1357 : retry:
631 [ - + ]: 1357 : Assert(MyReplicationSlot == NULL);
632 : :
633 : 1357 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
634 : :
635 : : /* Check if the slot exists with the given name. */
1738 alvherre@alvh.no-ip. 636 : 1357 : s = SearchNamedReplicationSlot(name, false);
2095 fujii@postgresql.org 637 [ + + - + ]: 1357 : if (s == NULL || !s->in_use)
638 : : {
639 : 9 : LWLockRelease(ReplicationSlotControlLock);
640 : :
4426 rhaas@postgresql.org 641 [ + - ]: 9 : ereport(ERROR,
642 : : (errcode(ERRCODE_UNDEFINED_OBJECT),
643 : : errmsg("replication slot \"%s\" does not exist",
644 : : name)));
645 : : }
646 : :
647 : : /*
648 : : * Do not allow users to acquire the reserved slot. This scenario may
649 : : * occur if the launcher that owns the slot has terminated unexpectedly
650 : : * due to an error, and a backend process attempts to reuse the slot.
651 : : */
235 akapila@postgresql.o 652 [ + + - + ]:GNC 1348 : if (!IsLogicalLauncher() && IsSlotForConflictCheck(name))
235 akapila@postgresql.o 653 [ # # ]:UNC 0 : ereport(ERROR,
654 : : errcode(ERRCODE_UNDEFINED_OBJECT),
655 : : errmsg("cannot acquire replication slot \"%s\"", name),
656 : : errdetail("The slot is reserved for conflict detection and can only be acquired by logical replication launcher."));
657 : :
658 : : /*
659 : : * This is the slot we want; check if it's active under some other
660 : : * process. In single user mode, we don't need this check.
661 : : */
2095 fujii@postgresql.org 662 [ + + ]:CBC 1348 : if (IsUnderPostmaster)
663 : : {
664 : : /*
665 : : * Get ready to sleep on the slot in case it is active. (We may end
666 : : * up not sleeping, but we don't want to do this while holding the
667 : : * spinlock.)
668 : : */
1738 alvherre@alvh.no-ip. 669 [ + + ]: 1343 : if (!nowait)
2095 fujii@postgresql.org 670 : 289 : ConditionVariablePrepareToSleep(&s->active_cv);
671 : :
672 : : /*
673 : : * It is important to reset the inactive_since under spinlock here to
674 : : * avoid race conditions with slot invalidation. See comments related
675 : : * to inactive_since in InvalidatePossiblyObsoleteSlot.
676 : : */
677 [ - + ]: 1343 : SpinLockAcquire(&s->mutex);
33 heikki.linnakangas@i 678 [ + + ]:GNC 1343 : if (s->active_proc == INVALID_PROC_NUMBER)
679 : 1194 : s->active_proc = MyProcNumber;
680 : 1343 : active_proc = s->active_proc;
389 akapila@postgresql.o 681 :CBC 1343 : ReplicationSlotSetInactiveSince(s, 0, false);
2095 fujii@postgresql.org 682 : 1343 : SpinLockRelease(&s->mutex);
683 : : }
684 : : else
685 : : {
33 heikki.linnakangas@i 686 :GNC 5 : s->active_proc = active_proc = MyProcNumber;
389 akapila@postgresql.o 687 :CBC 5 : ReplicationSlotSetInactiveSince(s, 0, true);
688 : : }
33 heikki.linnakangas@i 689 :GNC 1348 : active_pid = GetPGProcByNumber(active_proc)->pid;
2095 fujii@postgresql.org 690 :CBC 1348 : LWLockRelease(ReplicationSlotControlLock);
691 : :
692 : : /*
693 : : * If we found the slot but it's already active in another process, we
694 : : * wait until the owning process signals us that it's been released, or
695 : : * error out.
696 : : */
33 heikki.linnakangas@i 697 [ - + ]:GNC 1348 : if (active_proc != MyProcNumber)
698 : : {
1738 alvherre@alvh.no-ip. 699 [ # # ]:UBC 0 : if (!nowait)
700 : : {
701 : : /* Wait here until we get signaled, and then restart */
1735 702 : 0 : ConditionVariableSleep(&s->active_cv,
703 : : WAIT_EVENT_REPLICATION_SLOT_DROP);
704 : 0 : ConditionVariableCancelSleep();
705 : 0 : goto retry;
706 : : }
707 : :
708 [ # # ]: 0 : ereport(ERROR,
709 : : (errcode(ERRCODE_OBJECT_IN_USE),
710 : : errmsg("replication slot \"%s\" is active for PID %d",
711 : : NameStr(s->data.name), active_pid)));
712 : : }
1738 alvherre@alvh.no-ip. 713 [ + + ]:CBC 1348 : else if (!nowait)
1768 tgl@sss.pgh.pa.us 714 : 289 : ConditionVariableCancelSleep(); /* no sleep needed after all */
715 : :
716 : : /* We made this slot active, so it's ours now. */
2095 fujii@postgresql.org 717 : 1348 : MyReplicationSlot = s;
718 : :
719 : : /*
720 : : * We need to check for invalidation after making the slot ours to avoid
721 : : * the possible race condition with the checkpointer that can otherwise
722 : : * invalidate the slot immediately after the check.
723 : : */
381 akapila@postgresql.o 724 [ + + + + ]: 1348 : if (error_if_invalid && s->data.invalidated != RS_INVAL_NONE)
725 [ + - ]: 7 : ereport(ERROR,
726 : : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
727 : : errmsg("can no longer access replication slot \"%s\"",
728 : : NameStr(s->data.name)),
729 : : errdetail("This replication slot has been invalidated due to \"%s\".",
730 : : GetSlotInvalidationCauseName(s->data.invalidated)));
731 : :
732 : : /* Let everybody know we've modified this slot */
733 : 1341 : ConditionVariableBroadcast(&s->active_cv);
734 : :
735 : : /*
736 : : * The call to pgstat_acquire_replslot() protects against stats for a
737 : : * different slot, from before a restart or such, being present during
738 : : * pgstat_report_replslot().
739 : : */
1439 andres@anarazel.de 740 [ + + ]: 1341 : if (SlotIsLogical(s))
741 : 1123 : pgstat_acquire_replslot(s);
742 : :
743 : :
845 akapila@postgresql.o 744 [ + + ]: 1341 : if (am_walsender)
745 : : {
746 [ + - + - : 918 : ereport(log_replication_commands ? LOG : DEBUG1,
+ + ]
747 : : SlotIsLogical(s)
748 : : ? errmsg("acquired logical replication slot \"%s\"",
749 : : NameStr(s->data.name))
750 : : : errmsg("acquired physical replication slot \"%s\"",
751 : : NameStr(s->data.name)));
752 : : }
4426 rhaas@postgresql.org 753 : 1341 : }
754 : :
755 : : /*
756 : : * Release the replication slot that this backend considers to own.
757 : : *
758 : : * This or another backend can re-acquire the slot later.
759 : : * Resources this slot requires will be preserved.
760 : : */
761 : : void
762 : 1626 : ReplicationSlotRelease(void)
763 : : {
764 : 1626 : ReplicationSlot *slot = MyReplicationSlot;
845 akapila@postgresql.o 765 : 1626 : char *slotname = NULL; /* keep compiler quiet */
766 : : bool is_logical;
720 767 : 1626 : TimestampTz now = 0;
768 : :
33 heikki.linnakangas@i 769 [ + - - + ]:GNC 1626 : Assert(slot != NULL && slot->active_proc != INVALID_PROC_NUMBER);
770 : :
82 msawada@postgresql.o 771 : 1626 : is_logical = SlotIsLogical(slot);
772 : :
845 akapila@postgresql.o 773 [ + + ]:CBC 1626 : if (am_walsender)
774 : 1137 : slotname = pstrdup(NameStr(slot->data.name));
775 : :
4395 rhaas@postgresql.org 776 [ + + ]: 1626 : if (slot->data.persistency == RS_EPHEMERAL)
777 : : {
778 : : /*
779 : : * Delete the slot. There is no !PANIC case where this is allowed to
780 : : * fail, all that may happen is an incomplete cleanup of the on-disk
781 : : * data.
782 : : */
783 : 6 : ReplicationSlotDropAcquired();
784 : :
785 : : /*
786 : : * Request to disable logical decoding, even though this slot may not
787 : : * have been the last logical slot. The checkpointer will verify if
788 : : * logical decoding should actually be disabled.
789 : : */
82 msawada@postgresql.o 790 [ + - ]:GNC 6 : if (is_logical)
791 : 6 : RequestDisableLogicalDecoding();
792 : : }
793 : :
794 : : /*
795 : : * If slot needed to temporarily restrain both data and catalog xmin to
796 : : * create the catalog snapshot, remove that temporary constraint.
797 : : * Snapshots can only be exported while the initial snapshot is still
798 : : * acquired.
799 : : */
3248 andres@anarazel.de 800 [ + + ]:CBC 1626 : if (!TransactionIdIsValid(slot->data.xmin) &&
801 [ + + ]: 1595 : TransactionIdIsValid(slot->effective_xmin))
802 : : {
803 [ - + ]: 208 : SpinLockAcquire(&slot->mutex);
804 : 208 : slot->effective_xmin = InvalidTransactionId;
805 : 208 : SpinLockRelease(&slot->mutex);
806 : 208 : ReplicationSlotsComputeRequiredXmin(false);
807 : : }
808 : :
809 : : /*
810 : : * Set the time since the slot has become inactive. We get the current
811 : : * time beforehand to avoid system call while holding the spinlock.
812 : : */
709 akapila@postgresql.o 813 : 1626 : now = GetCurrentTimestamp();
814 : :
3155 alvherre@alvh.no-ip. 815 [ + + ]: 1626 : if (slot->data.persistency == RS_PERSISTENT)
816 : : {
817 : : /*
818 : : * Mark persistent slot inactive. We're not freeing it, just
819 : : * disconnecting, but wake up others that may be waiting for it.
820 : : */
821 [ - + ]: 1320 : SpinLockAcquire(&slot->mutex);
33 heikki.linnakangas@i 822 :GNC 1320 : slot->active_proc = INVALID_PROC_NUMBER;
403 akapila@postgresql.o 823 :CBC 1320 : ReplicationSlotSetInactiveSince(slot, now, false);
3155 alvherre@alvh.no-ip. 824 : 1320 : SpinLockRelease(&slot->mutex);
825 : 1320 : ConditionVariableBroadcast(&slot->active_cv);
826 : : }
827 : : else
403 akapila@postgresql.o 828 : 306 : ReplicationSlotSetInactiveSince(slot, now, true);
829 : :
4395 rhaas@postgresql.org 830 : 1626 : MyReplicationSlot = NULL;
831 : :
832 : : /* might not have been set when we've been a plain slot */
1585 alvherre@alvh.no-ip. 833 : 1626 : LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
1945 834 : 1626 : MyProc->statusFlags &= ~PROC_IN_LOGICAL_DECODING;
835 : 1626 : ProcGlobal->statusFlags[MyProc->pgxactoff] = MyProc->statusFlags;
4395 rhaas@postgresql.org 836 : 1626 : LWLockRelease(ProcArrayLock);
837 : :
845 akapila@postgresql.o 838 [ + + ]: 1626 : if (am_walsender)
839 : : {
840 [ + - + - : 1137 : ereport(log_replication_commands ? LOG : DEBUG1,
+ + ]
841 : : is_logical
842 : : ? errmsg("released logical replication slot \"%s\"",
843 : : slotname)
844 : : : errmsg("released physical replication slot \"%s\"",
845 : : slotname));
846 : :
847 : 1137 : pfree(slotname);
848 : : }
4426 rhaas@postgresql.org 849 : 1626 : }
850 : :
851 : : /*
852 : : * Cleanup temporary slots created in current session.
853 : : *
854 : : * Cleanup only synced temporary slots if 'synced_only' is true, else
855 : : * cleanup all temporary slots.
856 : : *
857 : : * If it drops the last logical slot in the cluster, requests to disable
858 : : * logical decoding.
859 : : */
860 : : void
689 akapila@postgresql.o 861 : 44481 : ReplicationSlotCleanup(bool synced_only)
862 : : {
863 : : int i;
864 : : bool found_valid_logicalslot;
82 msawada@postgresql.o 865 :GNC 44481 : bool dropped_logical = false;
866 : :
3384 peter_e@gmx.net 867 [ + - ]:CBC 44481 : Assert(MyReplicationSlot == NULL);
868 : :
3155 alvherre@alvh.no-ip. 869 : 44481 : restart:
82 msawada@postgresql.o 870 :GNC 44632 : found_valid_logicalslot = false;
3155 alvherre@alvh.no-ip. 871 :CBC 44632 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
3384 peter_e@gmx.net 872 [ + + ]: 483518 : for (i = 0; i < max_replication_slots; i++)
873 : : {
874 : 439037 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
875 : :
3155 alvherre@alvh.no-ip. 876 [ + + ]: 439037 : if (!s->in_use)
877 : 424030 : continue;
878 : :
879 [ + + ]: 15007 : SpinLockAcquire(&s->mutex);
880 : :
82 msawada@postgresql.o 881 :GNC 30014 : found_valid_logicalslot |=
882 [ + + + + ]: 15007 : (SlotIsLogical(s) && s->data.invalidated == RS_INVAL_NONE);
883 : :
33 heikki.linnakangas@i 884 [ + + ]: 15007 : if ((s->active_proc == MyProcNumber &&
689 akapila@postgresql.o 885 [ - + - - ]:CBC 151 : (!synced_only || s->data.synced)))
886 : : {
3155 alvherre@alvh.no-ip. 887 [ - + ]: 151 : Assert(s->data.persistency == RS_TEMPORARY);
888 : 151 : SpinLockRelease(&s->mutex);
889 : 151 : LWLockRelease(ReplicationSlotControlLock); /* avoid deadlock */
890 : :
82 msawada@postgresql.o 891 [ + + ]:GNC 151 : if (SlotIsLogical(s))
892 : 10 : dropped_logical = true;
893 : :
3384 peter_e@gmx.net 894 :CBC 151 : ReplicationSlotDropPtr(s);
895 : :
3155 alvherre@alvh.no-ip. 896 : 151 : ConditionVariableBroadcast(&s->active_cv);
897 : 151 : goto restart;
898 : : }
899 : : else
900 : 14856 : SpinLockRelease(&s->mutex);
901 : : }
902 : :
903 : 44481 : LWLockRelease(ReplicationSlotControlLock);
904 : :
82 msawada@postgresql.o 905 [ + + + + ]:GNC 44481 : if (dropped_logical && !found_valid_logicalslot)
906 : 3 : RequestDisableLogicalDecoding();
3384 peter_e@gmx.net 907 :CBC 44481 : }
908 : :
909 : : /*
910 : : * Permanently drop replication slot identified by the passed in name.
911 : : */
912 : : void
3155 alvherre@alvh.no-ip. 913 : 436 : ReplicationSlotDrop(const char *name, bool nowait)
914 : : {
915 : : bool is_logical;
916 : :
4395 rhaas@postgresql.org 917 [ - + ]: 436 : Assert(MyReplicationSlot == NULL);
918 : :
408 akapila@postgresql.o 919 : 436 : ReplicationSlotAcquire(name, nowait, false);
920 : :
921 : : /*
922 : : * Do not allow users to drop the slots which are currently being synced
923 : : * from the primary to the standby.
924 : : */
760 925 [ + + + + ]: 429 : if (RecoveryInProgress() && MyReplicationSlot->data.synced)
926 [ + - ]: 1 : ereport(ERROR,
927 : : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
928 : : errmsg("cannot drop replication slot \"%s\"", name),
929 : : errdetail("This replication slot is being synchronized from the primary server."));
930 : :
82 msawada@postgresql.o 931 :GNC 428 : is_logical = SlotIsLogical(MyReplicationSlot);
932 : :
4395 rhaas@postgresql.org 933 :CBC 428 : ReplicationSlotDropAcquired();
934 : :
82 msawada@postgresql.o 935 [ + + ]:GNC 428 : if (is_logical)
936 : 408 : RequestDisableLogicalDecoding();
4395 rhaas@postgresql.org 937 :CBC 428 : }
938 : :
939 : : /*
940 : : * Change the definition of the slot identified by the specified name.
941 : : *
942 : : * Altering the two_phase property of a slot requires caution on the
943 : : * client-side. Enabling it at any random point during decoding has the
944 : : * risk that transactions prepared before this change may be skipped by
945 : : * the decoder, leading to missing prepare records on the client. So, we
946 : : * enable it for subscription related slots only once the initial tablesync
947 : : * is finished. See comments atop worker.c. Disabling it is safe only when
948 : : * there are no pending prepared transaction, otherwise, the changes of
949 : : * already prepared transactions can be replicated again along with their
950 : : * corresponding commit leading to duplicate data or errors.
951 : : */
952 : : void
599 akapila@postgresql.o 953 : 7 : ReplicationSlotAlter(const char *name, const bool *failover,
954 : : const bool *two_phase)
955 : : {
956 : 7 : bool update_slot = false;
957 : :
776 958 [ - + ]: 7 : Assert(MyReplicationSlot == NULL);
599 959 [ + + - + ]: 7 : Assert(failover || two_phase);
960 : :
408 961 : 7 : ReplicationSlotAcquire(name, false, true);
962 : :
776 963 [ - + ]: 6 : if (SlotIsPhysical(MyReplicationSlot))
776 akapila@postgresql.o 964 [ # # ]:UBC 0 : ereport(ERROR,
965 : : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
966 : : errmsg("cannot use %s with a physical replication slot",
967 : : "ALTER_REPLICATION_SLOT"));
968 : :
760 akapila@postgresql.o 969 [ + + ]:CBC 6 : if (RecoveryInProgress())
970 : : {
971 : : /*
972 : : * Do not allow users to alter the slots which are currently being
973 : : * synced from the primary to the standby.
974 : : */
975 [ + - ]: 1 : if (MyReplicationSlot->data.synced)
976 [ + - ]: 1 : ereport(ERROR,
977 : : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
978 : : errmsg("cannot alter replication slot \"%s\"", name),
979 : : errdetail("This replication slot is being synchronized from the primary server."));
980 : :
981 : : /*
982 : : * Do not allow users to enable failover on the standby as we do not
983 : : * support sync to the cascading standby.
984 : : */
599 akapila@postgresql.o 985 [ # # # # ]:UBC 0 : if (failover && *failover)
760 986 [ # # ]: 0 : ereport(ERROR,
987 : : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
988 : : errmsg("cannot enable failover for a replication slot"
989 : : " on the standby"));
990 : : }
991 : :
599 akapila@postgresql.o 992 [ + + ]:CBC 5 : if (failover)
993 : : {
994 : : /*
995 : : * Do not allow users to enable failover for temporary slots as we do
996 : : * not support syncing temporary slots to the standby.
997 : : */
998 [ + + - + ]: 4 : if (*failover && MyReplicationSlot->data.persistency == RS_TEMPORARY)
599 akapila@postgresql.o 999 [ # # ]:UBC 0 : ereport(ERROR,
1000 : : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1001 : : errmsg("cannot enable failover for a temporary replication slot"));
1002 : :
599 akapila@postgresql.o 1003 [ + - ]:CBC 4 : if (MyReplicationSlot->data.failover != *failover)
1004 : : {
1005 [ - + ]: 4 : SpinLockAcquire(&MyReplicationSlot->mutex);
1006 : 4 : MyReplicationSlot->data.failover = *failover;
1007 : 4 : SpinLockRelease(&MyReplicationSlot->mutex);
1008 : :
1009 : 4 : update_slot = true;
1010 : : }
1011 : : }
1012 : :
1013 [ + + + - ]: 5 : if (two_phase && MyReplicationSlot->data.two_phase != *two_phase)
1014 : : {
767 1015 [ - + ]: 1 : SpinLockAcquire(&MyReplicationSlot->mutex);
599 1016 : 1 : MyReplicationSlot->data.two_phase = *two_phase;
767 1017 : 1 : SpinLockRelease(&MyReplicationSlot->mutex);
1018 : :
599 1019 : 1 : update_slot = true;
1020 : : }
1021 : :
1022 [ + - ]: 5 : if (update_slot)
1023 : : {
767 1024 : 5 : ReplicationSlotMarkDirty();
1025 : 5 : ReplicationSlotSave();
1026 : : }
1027 : :
776 1028 : 5 : ReplicationSlotRelease();
1029 : 5 : }
1030 : :
1031 : : /*
1032 : : * Permanently drop the currently acquired replication slot.
1033 : : */
1034 : : void
4395 rhaas@postgresql.org 1035 : 442 : ReplicationSlotDropAcquired(void)
1036 : : {
1037 : 442 : ReplicationSlot *slot = MyReplicationSlot;
1038 : :
1039 [ - + ]: 442 : Assert(MyReplicationSlot != NULL);
1040 : :
1041 : : /* slot isn't acquired anymore */
1042 : 442 : MyReplicationSlot = NULL;
1043 : :
3384 peter_e@gmx.net 1044 : 442 : ReplicationSlotDropPtr(slot);
1045 : 442 : }
1046 : :
1047 : : /*
1048 : : * Permanently drop the replication slot which will be released by the point
1049 : : * this function returns.
1050 : : */
1051 : : static void
1052 : 593 : ReplicationSlotDropPtr(ReplicationSlot *slot)
1053 : : {
1054 : : char path[MAXPGPATH];
1055 : : char tmppath[MAXPGPATH];
1056 : :
1057 : : /*
1058 : : * If some other backend ran this code concurrently with us, we might try
1059 : : * to delete a slot with a certain name while someone else was trying to
1060 : : * create a slot with the same name.
1061 : : */
4426 rhaas@postgresql.org 1062 : 593 : LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
1063 : :
1064 : : /* Generate pathnames. */
562 michael@paquier.xyz 1065 : 593 : sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(slot->data.name));
1066 : 593 : sprintf(tmppath, "%s/%s.tmp", PG_REPLSLOT_DIR, NameStr(slot->data.name));
1067 : :
1068 : : /*
1069 : : * Rename the slot directory on disk, so that we'll no longer recognize
1070 : : * this as a valid slot. Note that if this fails, we've got to mark the
1071 : : * slot inactive before bailing out. If we're dropping an ephemeral or a
1072 : : * temporary slot, we better never fail hard as the caller won't expect
1073 : : * the slot to survive and this might get called during error handling.
1074 : : */
4395 rhaas@postgresql.org 1075 [ + - ]: 593 : if (rename(path, tmppath) == 0)
1076 : : {
1077 : : /*
1078 : : * We need to fsync() the directory we just renamed and its parent to
1079 : : * make sure that our changes are on disk in a crash-safe fashion. If
1080 : : * fsync() fails, we can't be sure whether the changes are on disk or
1081 : : * not. For now, we handle that by panicking;
1082 : : * StartupReplicationSlots() will try to straighten it out after
1083 : : * restart.
1084 : : */
1085 : 593 : START_CRIT_SECTION();
1086 : 593 : fsync_fname(tmppath, true);
562 michael@paquier.xyz 1087 : 593 : fsync_fname(PG_REPLSLOT_DIR, true);
4395 rhaas@postgresql.org 1088 [ - + ]: 593 : END_CRIT_SECTION();
1089 : : }
1090 : : else
1091 : : {
3384 peter_e@gmx.net 1092 :UBC 0 : bool fail_softly = slot->data.persistency != RS_PERSISTENT;
1093 : :
4426 rhaas@postgresql.org 1094 [ # # ]: 0 : SpinLockAcquire(&slot->mutex);
33 heikki.linnakangas@i 1095 :UNC 0 : slot->active_proc = INVALID_PROC_NUMBER;
4426 rhaas@postgresql.org 1096 :UBC 0 : SpinLockRelease(&slot->mutex);
1097 : :
1098 : : /* wake up anyone waiting on this slot */
3155 alvherre@alvh.no-ip. 1099 : 0 : ConditionVariableBroadcast(&slot->active_cv);
1100 : :
4395 rhaas@postgresql.org 1101 [ # # # # ]: 0 : ereport(fail_softly ? WARNING : ERROR,
1102 : : (errcode_for_file_access(),
1103 : : errmsg("could not rename file \"%s\" to \"%s\": %m",
1104 : : path, tmppath)));
1105 : : }
1106 : :
1107 : : /*
1108 : : * The slot is definitely gone. Lock out concurrent scans of the array
1109 : : * long enough to kill it. It's OK to clear the active PID here without
1110 : : * grabbing the mutex because nobody else can be scanning the array here,
1111 : : * and nobody can be attached to this slot and thus access it without
1112 : : * scanning the array.
1113 : : *
1114 : : * Also wake up processes waiting for it.
1115 : : */
4426 rhaas@postgresql.org 1116 :CBC 593 : LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
33 heikki.linnakangas@i 1117 :GNC 593 : slot->active_proc = INVALID_PROC_NUMBER;
4426 rhaas@postgresql.org 1118 :CBC 593 : slot->in_use = false;
1119 : 593 : LWLockRelease(ReplicationSlotControlLock);
3155 alvherre@alvh.no-ip. 1120 : 593 : ConditionVariableBroadcast(&slot->active_cv);
1121 : :
1122 : : /*
1123 : : * Slot is dead and doesn't prevent resource removal anymore, recompute
1124 : : * limits.
1125 : : */
4395 rhaas@postgresql.org 1126 : 593 : ReplicationSlotsComputeRequiredXmin(false);
4426 1127 : 593 : ReplicationSlotsComputeRequiredLSN();
1128 : :
1129 : : /*
1130 : : * If removing the directory fails, the worst thing that will happen is
1131 : : * that the user won't be able to create a new slot with the same name
1132 : : * until the next server restart. We warn about it, but that's all.
1133 : : */
1134 [ - + ]: 593 : if (!rmtree(tmppath, true))
4426 rhaas@postgresql.org 1135 [ # # ]:UBC 0 : ereport(WARNING,
1136 : : (errmsg("could not remove directory \"%s\"", tmppath)));
1137 : :
1138 : : /*
1139 : : * Drop the statistics entry for the replication slot. Do this while
1140 : : * holding ReplicationSlotAllocationLock so that we don't drop a
1141 : : * statistics entry for another slot with the same name just created in
1142 : : * another session.
1143 : : */
1984 akapila@postgresql.o 1144 [ + + ]:CBC 593 : if (SlotIsLogical(slot))
1439 andres@anarazel.de 1145 : 431 : pgstat_drop_replslot(slot);
1146 : :
1147 : : /*
1148 : : * We release this at the very end, so that nobody starts trying to create
1149 : : * a slot while we're still cleaning up the detritus of the old one.
1150 : : */
4426 rhaas@postgresql.org 1151 : 593 : LWLockRelease(ReplicationSlotAllocationLock);
1152 : 593 : }
1153 : :
1154 : : /*
1155 : : * Serialize the currently acquired slot's state from memory to disk, thereby
1156 : : * guaranteeing the current state will survive a crash.
1157 : : */
1158 : : void
1159 : 1467 : ReplicationSlotSave(void)
1160 : : {
1161 : : char path[MAXPGPATH];
1162 : :
1163 [ - + ]: 1467 : Assert(MyReplicationSlot != NULL);
1164 : :
562 michael@paquier.xyz 1165 : 1467 : sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(MyReplicationSlot->data.name));
4426 rhaas@postgresql.org 1166 : 1467 : SaveSlotToPath(MyReplicationSlot, path, ERROR);
1167 : 1467 : }
1168 : :
1169 : : /*
1170 : : * Signal that it would be useful if the currently acquired slot would be
1171 : : * flushed out to disk.
1172 : : *
1173 : : * Note that the actual flush to disk can be delayed for a long time, if
1174 : : * required for correctness explicitly do a ReplicationSlotSave().
1175 : : */
1176 : : void
1177 : 39589 : ReplicationSlotMarkDirty(void)
1178 : : {
3813 1179 : 39589 : ReplicationSlot *slot = MyReplicationSlot;
1180 : :
4426 1181 [ - + ]: 39589 : Assert(MyReplicationSlot != NULL);
1182 : :
3813 1183 [ - + ]: 39589 : SpinLockAcquire(&slot->mutex);
1184 : 39589 : MyReplicationSlot->just_dirtied = true;
1185 : 39589 : MyReplicationSlot->dirty = true;
1186 : 39589 : SpinLockRelease(&slot->mutex);
4426 1187 : 39589 : }
1188 : :
1189 : : /*
1190 : : * Convert a slot that's marked as RS_EPHEMERAL or RS_TEMPORARY to a
1191 : : * RS_PERSISTENT slot, guaranteeing it will be there after an eventual crash.
1192 : : */
1193 : : void
4395 1194 : 484 : ReplicationSlotPersist(void)
1195 : : {
1196 : 484 : ReplicationSlot *slot = MyReplicationSlot;
1197 : :
1198 [ - + ]: 484 : Assert(slot != NULL);
1199 [ - + ]: 484 : Assert(slot->data.persistency != RS_PERSISTENT);
1200 : :
3813 1201 [ - + ]: 484 : SpinLockAcquire(&slot->mutex);
1202 : 484 : slot->data.persistency = RS_PERSISTENT;
1203 : 484 : SpinLockRelease(&slot->mutex);
1204 : :
4395 1205 : 484 : ReplicationSlotMarkDirty();
1206 : 484 : ReplicationSlotSave();
1207 : 484 : }
1208 : :
1209 : : /*
1210 : : * Compute the oldest xmin across all slots and store it in the ProcArray.
1211 : : *
1212 : : * If already_locked is true, both the ReplicationSlotControlLock and the
1213 : : * ProcArrayLock have already been acquired exclusively. It is crucial that the
1214 : : * caller first acquires the ReplicationSlotControlLock, followed by the
1215 : : * ProcArrayLock, to prevent any undetectable deadlocks since this function
1216 : : * acquires them in that order.
1217 : : */
1218 : : void
1219 : 2497 : ReplicationSlotsComputeRequiredXmin(bool already_locked)
1220 : : {
1221 : : int i;
4426 1222 : 2497 : TransactionId agg_xmin = InvalidTransactionId;
4395 1223 : 2497 : TransactionId agg_catalog_xmin = InvalidTransactionId;
1224 : :
4426 1225 [ - + ]: 2497 : Assert(ReplicationSlotCtl != NULL);
75 msawada@postgresql.o 1226 [ + + + - : 2497 : Assert(!already_locked ||
- + ]
1227 : : (LWLockHeldByMeInMode(ReplicationSlotControlLock, LW_EXCLUSIVE) &&
1228 : : LWLockHeldByMeInMode(ProcArrayLock, LW_EXCLUSIVE)));
1229 : :
1230 : : /*
1231 : : * Hold the ReplicationSlotControlLock until after updating the slot xmin
1232 : : * values, so no backend updates the initial xmin for newly created slot
1233 : : * concurrently. A shared lock is used here to minimize lock contention,
1234 : : * especially when many slots exist and advancements occur frequently.
1235 : : * This is safe since an exclusive lock is taken during initial slot xmin
1236 : : * update in slot creation.
1237 : : *
1238 : : * One might think that we can hold the ProcArrayLock exclusively and
1239 : : * update the slot xmin values, but it could increase lock contention on
1240 : : * the ProcArrayLock, which is not great since this function can be called
1241 : : * at non-negligible frequency.
1242 : : *
1243 : : * Concurrent invocation of this function may cause the computed slot xmin
1244 : : * to regress. However, this is harmless because tuples prior to the most
1245 : : * recent xmin are no longer useful once advancement occurs (see
1246 : : * LogicalConfirmReceivedLocation where the slot's xmin value is flushed
1247 : : * before updating the effective_xmin). Thus, such regression merely
1248 : : * prevents VACUUM from prematurely removing tuples without causing the
1249 : : * early deletion of required data.
1250 : : */
1251 [ + + ]: 2497 : if (!already_locked)
1252 : 1996 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1253 : :
4426 rhaas@postgresql.org 1254 [ + + ]: 25413 : for (i = 0; i < max_replication_slots; i++)
1255 : : {
1256 : 22916 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
1257 : : TransactionId effective_xmin;
1258 : : TransactionId effective_catalog_xmin;
1259 : : bool invalidated;
1260 : :
1261 [ + + ]: 22916 : if (!s->in_use)
1262 : 20486 : continue;
1263 : :
3813 1264 [ - + ]: 2430 : SpinLockAcquire(&s->mutex);
1265 : 2430 : effective_xmin = s->effective_xmin;
1266 : 2430 : effective_catalog_xmin = s->effective_catalog_xmin;
1073 andres@anarazel.de 1267 : 2430 : invalidated = s->data.invalidated != RS_INVAL_NONE;
3813 rhaas@postgresql.org 1268 : 2430 : SpinLockRelease(&s->mutex);
1269 : :
1270 : : /* invalidated slots need not apply */
1209 alvherre@alvh.no-ip. 1271 [ + + ]: 2430 : if (invalidated)
1272 : 26 : continue;
1273 : :
1274 : : /* check the data xmin */
4426 rhaas@postgresql.org 1275 [ + + + + ]: 2404 : if (TransactionIdIsValid(effective_xmin) &&
1276 [ - + ]: 5 : (!TransactionIdIsValid(agg_xmin) ||
1277 : 5 : TransactionIdPrecedes(effective_xmin, agg_xmin)))
1278 : 318 : agg_xmin = effective_xmin;
1279 : :
1280 : : /* check the catalog xmin */
4395 1281 [ + + + + ]: 2404 : if (TransactionIdIsValid(effective_catalog_xmin) &&
1282 [ + + ]: 996 : (!TransactionIdIsValid(agg_catalog_xmin) ||
1283 : 996 : TransactionIdPrecedes(effective_catalog_xmin, agg_catalog_xmin)))
1284 : 1232 : agg_catalog_xmin = effective_catalog_xmin;
1285 : : }
1286 : :
1287 : 2497 : ProcArraySetReplicationSlotXmin(agg_xmin, agg_catalog_xmin, already_locked);
1288 : :
75 msawada@postgresql.o 1289 [ + + ]: 2497 : if (!already_locked)
1290 : 1996 : LWLockRelease(ReplicationSlotControlLock);
4426 rhaas@postgresql.org 1291 : 2497 : }
1292 : :
1293 : : /*
1294 : : * Compute the oldest restart LSN across all slots and inform xlog module.
1295 : : *
1296 : : * Note: while max_slot_wal_keep_size is theoretically relevant for this
1297 : : * purpose, we don't try to account for that, because this module doesn't
1298 : : * know what to compare against.
1299 : : */
1300 : : void
1301 : 40265 : ReplicationSlotsComputeRequiredLSN(void)
1302 : : {
1303 : : int i;
4331 bruce@momjian.us 1304 : 40265 : XLogRecPtr min_required = InvalidXLogRecPtr;
1305 : :
4426 rhaas@postgresql.org 1306 [ - + ]: 40265 : Assert(ReplicationSlotCtl != NULL);
1307 : :
1308 : 40265 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1309 [ + + ]: 438979 : for (i = 0; i < max_replication_slots; i++)
1310 : : {
1311 : 398714 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
1312 : : XLogRecPtr restart_lsn;
1313 : : XLogRecPtr last_saved_restart_lsn;
1314 : : bool invalidated;
1315 : : ReplicationSlotPersistency persistency;
1316 : :
1317 [ + + ]: 398714 : if (!s->in_use)
1318 : 358252 : continue;
1319 : :
3813 1320 [ - + ]: 40462 : SpinLockAcquire(&s->mutex);
274 akorotkov@postgresql 1321 : 40462 : persistency = s->data.persistency;
3813 rhaas@postgresql.org 1322 : 40462 : restart_lsn = s->data.restart_lsn;
1073 andres@anarazel.de 1323 : 40462 : invalidated = s->data.invalidated != RS_INVAL_NONE;
274 akorotkov@postgresql 1324 : 40462 : last_saved_restart_lsn = s->last_saved_restart_lsn;
3813 rhaas@postgresql.org 1325 : 40462 : SpinLockRelease(&s->mutex);
1326 : :
1327 : : /* invalidated slots need not apply */
1073 andres@anarazel.de 1328 [ + + ]: 40462 : if (invalidated)
1329 : 27 : continue;
1330 : :
1331 : : /*
1332 : : * For persistent slot use last_saved_restart_lsn to compute the
1333 : : * oldest LSN for removal of WAL segments. The segments between
1334 : : * last_saved_restart_lsn and restart_lsn might be needed by a
1335 : : * persistent slot in the case of database crash. Non-persistent
1336 : : * slots can't survive the database crash, so we don't care about
1337 : : * last_saved_restart_lsn for them.
1338 : : */
274 akorotkov@postgresql 1339 [ + + ]: 40435 : if (persistency == RS_PERSISTENT)
1340 : : {
129 alvherre@kurilemu.de 1341 [ + + + + ]:GNC 39566 : if (XLogRecPtrIsValid(last_saved_restart_lsn) &&
1342 : : restart_lsn > last_saved_restart_lsn)
1343 : : {
274 akorotkov@postgresql 1344 :CBC 37257 : restart_lsn = last_saved_restart_lsn;
1345 : : }
1346 : : }
1347 : :
129 alvherre@kurilemu.de 1348 [ + + + + ]:GNC 40435 : if (XLogRecPtrIsValid(restart_lsn) &&
1349 [ + + ]: 1251 : (!XLogRecPtrIsValid(min_required) ||
1350 : : restart_lsn < min_required))
4426 rhaas@postgresql.org 1351 :CBC 39253 : min_required = restart_lsn;
1352 : : }
1353 : 40265 : LWLockRelease(ReplicationSlotControlLock);
1354 : :
1355 : 40265 : XLogSetReplicationSlotMinimumLSN(min_required);
1356 : 40265 : }
1357 : :
1358 : : /*
1359 : : * Compute the oldest WAL LSN required by *logical* decoding slots..
1360 : : *
1361 : : * Returns InvalidXLogRecPtr if logical decoding is disabled or no logical
1362 : : * slots exist.
1363 : : *
1364 : : * NB: this returns a value >= ReplicationSlotsComputeRequiredLSN(), since it
1365 : : * ignores physical replication slots.
1366 : : *
1367 : : * The results aren't required frequently, so we don't maintain a precomputed
1368 : : * value like we do for ComputeRequiredLSN() and ComputeRequiredXmin().
1369 : : */
1370 : : XLogRecPtr
4395 1371 : 3604 : ReplicationSlotsComputeLogicalRestartLSN(void)
1372 : : {
1373 : 3604 : XLogRecPtr result = InvalidXLogRecPtr;
1374 : : int i;
1375 : :
1376 [ + + ]: 3604 : if (max_replication_slots <= 0)
4395 rhaas@postgresql.org 1377 :GBC 2 : return InvalidXLogRecPtr;
1378 : :
4395 rhaas@postgresql.org 1379 :CBC 3602 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1380 : :
1381 [ + + ]: 39092 : for (i = 0; i < max_replication_slots; i++)
1382 : : {
1383 : : ReplicationSlot *s;
1384 : : XLogRecPtr restart_lsn;
1385 : : XLogRecPtr last_saved_restart_lsn;
1386 : : bool invalidated;
1387 : : ReplicationSlotPersistency persistency;
1388 : :
1389 : 35490 : s = &ReplicationSlotCtl->replication_slots[i];
1390 : :
1391 : : /* cannot change while ReplicationSlotCtlLock is held */
1392 [ + + ]: 35490 : if (!s->in_use)
1393 : 34698 : continue;
1394 : :
1395 : : /* we're only interested in logical slots */
3869 andres@anarazel.de 1396 [ + + ]: 792 : if (!SlotIsLogical(s))
4395 rhaas@postgresql.org 1397 : 544 : continue;
1398 : :
1399 : : /* read once, it's ok if it increases while we're checking */
1400 [ - + ]: 248 : SpinLockAcquire(&s->mutex);
274 akorotkov@postgresql 1401 : 248 : persistency = s->data.persistency;
4395 rhaas@postgresql.org 1402 : 248 : restart_lsn = s->data.restart_lsn;
1073 andres@anarazel.de 1403 : 248 : invalidated = s->data.invalidated != RS_INVAL_NONE;
274 akorotkov@postgresql 1404 : 248 : last_saved_restart_lsn = s->last_saved_restart_lsn;
4395 rhaas@postgresql.org 1405 : 248 : SpinLockRelease(&s->mutex);
1406 : :
1407 : : /* invalidated slots need not apply */
1073 andres@anarazel.de 1408 [ + + ]: 248 : if (invalidated)
1409 : 10 : continue;
1410 : :
1411 : : /*
1412 : : * For persistent slot use last_saved_restart_lsn to compute the
1413 : : * oldest LSN for removal of WAL segments. The segments between
1414 : : * last_saved_restart_lsn and restart_lsn might be needed by a
1415 : : * persistent slot in the case of database crash. Non-persistent
1416 : : * slots can't survive the database crash, so we don't care about
1417 : : * last_saved_restart_lsn for them.
1418 : : */
274 akorotkov@postgresql 1419 [ + + ]: 238 : if (persistency == RS_PERSISTENT)
1420 : : {
129 alvherre@kurilemu.de 1421 [ + - - + ]:GNC 236 : if (XLogRecPtrIsValid(last_saved_restart_lsn) &&
1422 : : restart_lsn > last_saved_restart_lsn)
1423 : : {
274 akorotkov@postgresql 1424 :UBC 0 : restart_lsn = last_saved_restart_lsn;
1425 : : }
1426 : : }
1427 : :
129 alvherre@kurilemu.de 1428 [ - + ]:GNC 238 : if (!XLogRecPtrIsValid(restart_lsn))
2168 alvherre@alvh.no-ip. 1429 :UBC 0 : continue;
1430 : :
129 alvherre@kurilemu.de 1431 [ + + + + ]:GNC 238 : if (!XLogRecPtrIsValid(result) ||
1432 : : restart_lsn < result)
4395 rhaas@postgresql.org 1433 :CBC 186 : result = restart_lsn;
1434 : : }
1435 : :
1436 : 3602 : LWLockRelease(ReplicationSlotControlLock);
1437 : :
1438 : 3602 : return result;
1439 : : }
1440 : :
1441 : : /*
1442 : : * ReplicationSlotsCountDBSlots -- count the number of slots that refer to the
1443 : : * passed database oid.
1444 : : *
1445 : : * Returns true if there are any slots referencing the database. *nslots will
1446 : : * be set to the absolute number of slots in the database, *nactive to ones
1447 : : * currently active.
1448 : : */
1449 : : bool
1450 : 48 : ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
1451 : : {
1452 : : int i;
1453 : :
1454 : 48 : *nslots = *nactive = 0;
1455 : :
1456 [ - + ]: 48 : if (max_replication_slots <= 0)
4395 rhaas@postgresql.org 1457 :UBC 0 : return false;
1458 : :
4395 rhaas@postgresql.org 1459 :CBC 48 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1460 [ + + ]: 501 : for (i = 0; i < max_replication_slots; i++)
1461 : : {
1462 : : ReplicationSlot *s;
1463 : :
1464 : 453 : s = &ReplicationSlotCtl->replication_slots[i];
1465 : :
1466 : : /* cannot change while ReplicationSlotCtlLock is held */
1467 [ + + ]: 453 : if (!s->in_use)
1468 : 432 : continue;
1469 : :
1470 : : /* only logical slots are database specific, skip */
3869 andres@anarazel.de 1471 [ + + ]: 21 : if (!SlotIsLogical(s))
4391 bruce@momjian.us 1472 : 10 : continue;
1473 : :
1474 : : /* not our database, skip */
4395 rhaas@postgresql.org 1475 [ + + ]: 11 : if (s->data.database != dboid)
1476 : 8 : continue;
1477 : :
1478 : : /* NB: intentionally counting invalidated slots */
1479 : :
1480 : : /* count slots with spinlock held */
1481 [ - + ]: 3 : SpinLockAcquire(&s->mutex);
1482 : 3 : (*nslots)++;
33 heikki.linnakangas@i 1483 [ + + ]:GNC 3 : if (s->active_proc != INVALID_PROC_NUMBER)
4395 rhaas@postgresql.org 1484 :CBC 1 : (*nactive)++;
1485 : 3 : SpinLockRelease(&s->mutex);
1486 : : }
1487 : 48 : LWLockRelease(ReplicationSlotControlLock);
1488 : :
1489 [ + + ]: 48 : if (*nslots > 0)
1490 : 3 : return true;
1491 : 45 : return false;
1492 : : }
1493 : :
1494 : : /*
1495 : : * ReplicationSlotsDropDBSlots -- Drop all db-specific slots relating to the
1496 : : * passed database oid. The caller should hold an exclusive lock on the
1497 : : * pg_database oid for the database to prevent creation of new slots on the db
1498 : : * or replay from existing slots.
1499 : : *
1500 : : * Another session that concurrently acquires an existing slot on the target DB
1501 : : * (most likely to drop it) may cause this function to ERROR. If that happens
1502 : : * it may have dropped some but not all slots.
1503 : : *
1504 : : * This routine isn't as efficient as it could be - but we don't drop
1505 : : * databases often, especially databases with lots of slots.
1506 : : *
1507 : : * If it drops the last logical slot in the cluster, it requests to disable
1508 : : * logical decoding.
1509 : : */
1510 : : void
3274 simon@2ndQuadrant.co 1511 : 61 : ReplicationSlotsDropDBSlots(Oid dboid)
1512 : : {
1513 : : int i;
1514 : : bool found_valid_logicalslot;
82 msawada@postgresql.o 1515 :GNC 61 : bool dropped = false;
1516 : :
3274 simon@2ndQuadrant.co 1517 [ + - ]:CBC 61 : if (max_replication_slots <= 0)
3274 simon@2ndQuadrant.co 1518 :UBC 0 : return;
1519 : :
3274 simon@2ndQuadrant.co 1520 :CBC 61 : restart:
82 msawada@postgresql.o 1521 :GNC 66 : found_valid_logicalslot = false;
3274 simon@2ndQuadrant.co 1522 :CBC 66 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1523 [ + + ]: 641 : for (i = 0; i < max_replication_slots; i++)
1524 : : {
1525 : : ReplicationSlot *s;
1526 : : char *slotname;
1527 : : ProcNumber active_proc;
1528 : :
1529 : 580 : s = &ReplicationSlotCtl->replication_slots[i];
1530 : :
1531 : : /* cannot change while ReplicationSlotCtlLock is held */
1532 [ + + ]: 580 : if (!s->in_use)
1533 : 550 : continue;
1534 : :
1535 : : /* only logical slots are database specific, skip */
1536 [ + + ]: 30 : if (!SlotIsLogical(s))
1537 : 11 : continue;
1538 : :
1539 : : /*
1540 : : * Check logical slots on other databases too so we can disable
1541 : : * logical decoding only if no slots in the cluster.
1542 : : */
82 msawada@postgresql.o 1543 :GNC 19 : SpinLockAcquire(&s->mutex);
1544 : 19 : found_valid_logicalslot |= (s->data.invalidated == RS_INVAL_NONE);
1545 : 19 : SpinLockRelease(&s->mutex);
1546 : :
1547 : : /* not our database, skip */
3274 simon@2ndQuadrant.co 1548 [ + + ]:CBC 19 : if (s->data.database != dboid)
1549 : 14 : continue;
1550 : :
1551 : : /* NB: intentionally including invalidated slots to drop */
1552 : :
1553 : : /* acquire slot, so ReplicationSlotDropAcquired can be reused */
1554 [ - + ]: 5 : SpinLockAcquire(&s->mutex);
1555 : : /* can't change while ReplicationSlotControlLock is held */
3266 andres@anarazel.de 1556 : 5 : slotname = NameStr(s->data.name);
33 heikki.linnakangas@i 1557 :GNC 5 : active_proc = s->active_proc;
1558 [ + - ]: 5 : if (active_proc == INVALID_PROC_NUMBER)
1559 : : {
3274 simon@2ndQuadrant.co 1560 :CBC 5 : MyReplicationSlot = s;
33 heikki.linnakangas@i 1561 :GNC 5 : s->active_proc = MyProcNumber;
1562 : : }
3274 simon@2ndQuadrant.co 1563 :CBC 5 : SpinLockRelease(&s->mutex);
1564 : :
1565 : : /*
1566 : : * Even though we hold an exclusive lock on the database object a
1567 : : * logical slot for that DB can still be active, e.g. if it's
1568 : : * concurrently being dropped by a backend connected to another DB.
1569 : : *
1570 : : * That's fairly unlikely in practice, so we'll just bail out.
1571 : : *
1572 : : * The slot sync worker holds a shared lock on the database before
1573 : : * operating on synced logical slots to avoid conflict with the drop
1574 : : * happening here. The persistent synced slots are thus safe but there
1575 : : * is a possibility that the slot sync worker has created a temporary
1576 : : * slot (which stays active even on release) and we are trying to drop
1577 : : * that here. In practice, the chances of hitting this scenario are
1578 : : * less as during slot synchronization, the temporary slot is
1579 : : * immediately converted to persistent and thus is safe due to the
1580 : : * shared lock taken on the database. So, we'll just bail out in such
1581 : : * a case.
1582 : : *
1583 : : * XXX: We can consider shutting down the slot sync worker before
1584 : : * trying to drop synced temporary slots here.
1585 : : */
33 heikki.linnakangas@i 1586 [ - + ]:GNC 5 : if (active_proc != INVALID_PROC_NUMBER)
3266 andres@anarazel.de 1587 [ # # ]:UBC 0 : ereport(ERROR,
1588 : : (errcode(ERRCODE_OBJECT_IN_USE),
1589 : : errmsg("replication slot \"%s\" is active for PID %d",
1590 : : slotname, GetPGProcByNumber(active_proc)->pid)));
1591 : :
1592 : : /*
1593 : : * To avoid duplicating ReplicationSlotDropAcquired() and to avoid
1594 : : * holding ReplicationSlotControlLock over filesystem operations,
1595 : : * release ReplicationSlotControlLock and use
1596 : : * ReplicationSlotDropAcquired.
1597 : : *
1598 : : * As that means the set of slots could change, restart scan from the
1599 : : * beginning each time we release the lock.
1600 : : */
3274 simon@2ndQuadrant.co 1601 :CBC 5 : LWLockRelease(ReplicationSlotControlLock);
1602 : 5 : ReplicationSlotDropAcquired();
82 msawada@postgresql.o 1603 :GNC 5 : dropped = true;
3274 simon@2ndQuadrant.co 1604 :CBC 5 : goto restart;
1605 : : }
1606 : 61 : LWLockRelease(ReplicationSlotControlLock);
1607 : :
82 msawada@postgresql.o 1608 [ + + - + ]:GNC 61 : if (dropped && !found_valid_logicalslot)
82 msawada@postgresql.o 1609 :UNC 0 : RequestDisableLogicalDecoding();
1610 : : }
1611 : :
1612 : : /*
1613 : : * Returns true if there is at least one in-use valid logical replication slot.
1614 : : */
1615 : : bool
82 msawada@postgresql.o 1616 :GNC 469 : CheckLogicalSlotExists(void)
1617 : : {
1618 : 469 : bool found = false;
1619 : :
1620 [ + + ]: 469 : if (max_replication_slots <= 0)
1621 : 1 : return false;
1622 : :
1623 : 468 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1624 [ + + ]: 5064 : for (int i = 0; i < max_replication_slots; i++)
1625 : : {
1626 : : ReplicationSlot *s;
1627 : : bool invalidated;
1628 : :
1629 : 4602 : s = &ReplicationSlotCtl->replication_slots[i];
1630 : :
1631 : : /* cannot change while ReplicationSlotCtlLock is held */
1632 [ + + ]: 4602 : if (!s->in_use)
1633 : 4574 : continue;
1634 : :
1635 [ + + ]: 28 : if (SlotIsPhysical(s))
1636 : 19 : continue;
1637 : :
1638 : 9 : SpinLockAcquire(&s->mutex);
1639 : 9 : invalidated = s->data.invalidated != RS_INVAL_NONE;
1640 : 9 : SpinLockRelease(&s->mutex);
1641 : :
1642 [ + + ]: 9 : if (invalidated)
1643 : 3 : continue;
1644 : :
1645 : 6 : found = true;
1646 : 6 : break;
1647 : : }
1648 : 468 : LWLockRelease(ReplicationSlotControlLock);
1649 : :
1650 : 468 : return found;
1651 : : }
1652 : :
1653 : : /*
1654 : : * Check whether the server's configuration supports using replication
1655 : : * slots.
1656 : : */
1657 : : void
4426 rhaas@postgresql.org 1658 :CBC 1850 : CheckSlotRequirements(void)
1659 : : {
1660 : : /*
1661 : : * NB: Adding a new requirement likely means that RestoreSlotFromDisk()
1662 : : * needs the same check.
1663 : : */
1664 : :
1665 [ - + ]: 1850 : if (max_replication_slots == 0)
4426 rhaas@postgresql.org 1666 [ # # ]:UBC 0 : ereport(ERROR,
1667 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1668 : : errmsg("replication slots can only be used if \"max_replication_slots\" > 0")));
1669 : :
3667 peter_e@gmx.net 1670 [ - + ]:CBC 1850 : if (wal_level < WAL_LEVEL_REPLICA)
4426 rhaas@postgresql.org 1671 [ # # ]:UBC 0 : ereport(ERROR,
1672 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1673 : : errmsg("replication slots can only be used if \"wal_level\" >= \"replica\"")));
4426 rhaas@postgresql.org 1674 :CBC 1850 : }
1675 : :
1676 : : /*
1677 : : * Check whether the user has privilege to use replication slots.
1678 : : */
1679 : : void
1643 michael@paquier.xyz 1680 : 594 : CheckSlotPermissions(void)
1681 : : {
1095 peter@eisentraut.org 1682 [ + + ]: 594 : if (!has_rolreplication(GetUserId()))
1643 michael@paquier.xyz 1683 [ + - ]: 5 : ereport(ERROR,
1684 : : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1685 : : errmsg("permission denied to use replication slots"),
1686 : : errdetail("Only roles with the %s attribute may use replication slots.",
1687 : : "REPLICATION")));
1688 : 589 : }
1689 : :
1690 : : /*
1691 : : * Reserve WAL for the currently active slot.
1692 : : *
1693 : : * Compute and set restart_lsn in a manner that's appropriate for the type of
1694 : : * the slot and concurrency safe.
1695 : : */
1696 : : void
3869 andres@anarazel.de 1697 : 642 : ReplicationSlotReserveWal(void)
1698 : : {
1699 : 642 : ReplicationSlot *slot = MyReplicationSlot;
1700 : : XLogSegNo segno;
1701 : : XLogRecPtr restart_lsn;
1702 : :
1703 [ - + ]: 642 : Assert(slot != NULL);
129 alvherre@kurilemu.de 1704 [ - + ]:GNC 642 : Assert(!XLogRecPtrIsValid(slot->data.restart_lsn));
1705 [ - + ]: 642 : Assert(!XLogRecPtrIsValid(slot->last_saved_restart_lsn));
1706 : :
1707 : : /*
1708 : : * The replication slot mechanism is used to prevent the removal of
1709 : : * required WAL.
1710 : : *
1711 : : * Acquire an exclusive lock to prevent the checkpoint process from
1712 : : * concurrently computing the minimum slot LSN (see
1713 : : * CheckPointReplicationSlots). This ensures that the WAL reserved for
1714 : : * replication cannot be removed during a checkpoint.
1715 : : *
1716 : : * The mechanism is reliable because if WAL reservation occurs first, the
1717 : : * checkpoint must wait for the restart_lsn update before determining the
1718 : : * minimum non-removable LSN. On the other hand, if the checkpoint happens
1719 : : * first, subsequent WAL reservations will select positions at or beyond
1720 : : * the redo pointer of that checkpoint.
1721 : : */
97 akapila@postgresql.o 1722 :CBC 642 : LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
1723 : :
1724 : : /*
1725 : : * For logical slots log a standby snapshot and start logical decoding at
1726 : : * exactly that position. That allows the slot to start up more quickly.
1727 : : * But on a standby we cannot do WAL writes, so just use the replay
1728 : : * pointer; effectively, an attempt to create a logical slot on standby
1729 : : * will cause it to wait for an xl_running_xact record to be logged
1730 : : * independently on the primary, so that a snapshot can be built using the
1731 : : * record.
1732 : : *
1733 : : * None of this is needed (or indeed helpful) for physical slots as
1734 : : * they'll start replay at the last logged checkpoint anyway. Instead,
1735 : : * return the location of the last redo LSN, where a base backup has to
1736 : : * start replay at.
1737 : : */
1738 [ + + ]: 642 : if (SlotIsPhysical(slot))
1739 : 159 : restart_lsn = GetRedoRecPtr();
1740 [ + + ]: 483 : else if (RecoveryInProgress())
1741 : 26 : restart_lsn = GetXLogReplayRecPtr(NULL);
1742 : : else
1743 : 457 : restart_lsn = GetXLogInsertRecPtr();
1744 : :
1745 [ - + ]: 642 : SpinLockAcquire(&slot->mutex);
1746 : 642 : slot->data.restart_lsn = restart_lsn;
1747 : 642 : SpinLockRelease(&slot->mutex);
1748 : :
1749 : : /* prevent WAL removal as fast as possible */
1750 : 642 : ReplicationSlotsComputeRequiredLSN();
1751 : :
1752 : : /* Checkpoint shouldn't remove the required WAL. */
1753 : 642 : XLByteToSeg(slot->data.restart_lsn, segno, wal_segment_size);
1754 [ - + ]: 642 : if (XLogGetLastRemovedSegno() >= segno)
97 akapila@postgresql.o 1755 [ # # ]:UBC 0 : elog(ERROR, "WAL required by replication slot %s has been removed concurrently",
1756 : : NameStr(slot->data.name));
1757 : :
97 akapila@postgresql.o 1758 :CBC 642 : LWLockRelease(ReplicationSlotAllocationLock);
1759 : :
1072 andres@anarazel.de 1760 [ + + + + ]: 642 : if (!RecoveryInProgress() && SlotIsLogical(slot))
1761 : : {
1762 : : XLogRecPtr flushptr;
1763 : :
1764 : : /* make sure we have enough information to start */
1765 : 457 : flushptr = LogStandbySnapshot();
1766 : :
1767 : : /* and make sure it's fsynced to disk */
1768 : 457 : XLogFlush(flushptr);
1769 : : }
3869 1770 : 642 : }
1771 : :
1772 : : /*
1773 : : * Report that replication slot needs to be invalidated
1774 : : */
1775 : : static void
1073 1776 : 23 : ReportSlotInvalidation(ReplicationSlotInvalidationCause cause,
1777 : : bool terminating,
1778 : : int pid,
1779 : : NameData slotname,
1780 : : XLogRecPtr restart_lsn,
1781 : : XLogRecPtr oldestLSN,
1782 : : TransactionId snapshotConflictHorizon,
1783 : : long slot_idle_seconds)
1784 : : {
1785 : : StringInfoData err_detail;
1786 : : StringInfoData err_hint;
1787 : :
1788 : 23 : initStringInfo(&err_detail);
389 akapila@postgresql.o 1789 : 23 : initStringInfo(&err_hint);
1790 : :
1073 andres@anarazel.de 1791 [ + + + - : 23 : switch (cause)
- - ]
1792 : : {
1793 : 7 : case RS_INVAL_WAL_REMOVED:
1794 : : {
351 peter@eisentraut.org 1795 : 7 : uint64 ex = oldestLSN - restart_lsn;
1796 : :
934 1797 : 7 : appendStringInfo(&err_detail,
251 alvherre@kurilemu.de 1798 :GNC 7 : ngettext("The slot's restart_lsn %X/%08X exceeds the limit by %" PRIu64 " byte.",
1799 : : "The slot's restart_lsn %X/%08X exceeds the limit by %" PRIu64 " bytes.",
1800 : : ex),
934 peter@eisentraut.org 1801 :CBC 7 : LSN_FORMAT_ARGS(restart_lsn),
1802 : : ex);
1803 : : /* translator: %s is a GUC variable name */
389 akapila@postgresql.o 1804 : 7 : appendStringInfo(&err_hint, _("You might need to increase \"%s\"."),
1805 : : "max_slot_wal_keep_size");
934 peter@eisentraut.org 1806 : 7 : break;
1807 : : }
1073 andres@anarazel.de 1808 : 12 : case RS_INVAL_HORIZON:
1809 : 12 : appendStringInfo(&err_detail, _("The slot conflicted with xid horizon %u."),
1810 : : snapshotConflictHorizon);
1811 : 12 : break;
1812 : :
1813 : 4 : case RS_INVAL_WAL_LEVEL:
82 msawada@postgresql.o 1814 :GNC 4 : appendStringInfoString(&err_detail, _("Logical decoding on standby requires the primary server to either set \"wal_level\" >= \"logical\" or have at least one logical slot when \"wal_level\" = \"replica\"."));
1073 andres@anarazel.de 1815 :CBC 4 : break;
1816 : :
389 akapila@postgresql.o 1817 :UBC 0 : case RS_INVAL_IDLE_TIMEOUT:
1818 : : {
1819 : : /* translator: %s is a GUC variable name */
247 fujii@postgresql.org 1820 : 0 : appendStringInfo(&err_detail, _("The slot's idle time of %lds exceeds the configured \"%s\" duration of %ds."),
1821 : : slot_idle_seconds, "idle_replication_slot_timeout",
1822 : : idle_replication_slot_timeout_secs);
1823 : : /* translator: %s is a GUC variable name */
389 akapila@postgresql.o 1824 : 0 : appendStringInfo(&err_hint, _("You might need to increase \"%s\"."),
1825 : : "idle_replication_slot_timeout");
1826 : 0 : break;
1827 : : }
1073 andres@anarazel.de 1828 : 0 : case RS_INVAL_NONE:
1829 : 0 : pg_unreachable();
1830 : : }
1831 : :
1073 andres@anarazel.de 1832 [ + - + + :CBC 23 : ereport(LOG,
+ + ]
1833 : : terminating ?
1834 : : errmsg("terminating process %d to release replication slot \"%s\"",
1835 : : pid, NameStr(slotname)) :
1836 : : errmsg("invalidating obsolete replication slot \"%s\"",
1837 : : NameStr(slotname)),
1838 : : errdetail_internal("%s", err_detail.data),
1839 : : err_hint.len ? errhint("%s", err_hint.data) : 0);
1840 : :
1841 : 23 : pfree(err_detail.data);
389 akapila@postgresql.o 1842 : 23 : pfree(err_hint.data);
1843 : 23 : }
1844 : :
1845 : : /*
1846 : : * Can we invalidate an idle replication slot?
1847 : : *
1848 : : * Idle timeout invalidation is allowed only when:
1849 : : *
1850 : : * 1. Idle timeout is set
1851 : : * 2. Slot has reserved WAL
1852 : : * 3. Slot is inactive
1853 : : * 4. The slot is not being synced from the primary while the server is in
1854 : : * recovery. This is because synced slots are always considered to be
1855 : : * inactive because they don't perform logical decoding to produce changes.
1856 : : */
1857 : : static inline bool
1858 : 373 : CanInvalidateIdleSlot(ReplicationSlot *s)
1859 : : {
247 fujii@postgresql.org 1860 : 373 : return (idle_replication_slot_timeout_secs != 0 &&
129 alvherre@kurilemu.de 1861 [ # # ]:UNC 0 : XLogRecPtrIsValid(s->data.restart_lsn) &&
389 akapila@postgresql.o 1862 [ - + - - ]:CBC 373 : s->inactive_since > 0 &&
389 akapila@postgresql.o 1863 [ # # # # ]:UBC 0 : !(RecoveryInProgress() && s->data.synced));
1864 : : }
1865 : :
1866 : : /*
1867 : : * DetermineSlotInvalidationCause - Determine the cause for which a slot
1868 : : * becomes invalid among the given possible causes.
1869 : : *
1870 : : * This function sequentially checks all possible invalidation causes and
1871 : : * returns the first one for which the slot is eligible for invalidation.
1872 : : */
1873 : : static ReplicationSlotInvalidationCause
389 akapila@postgresql.o 1874 :CBC 414 : DetermineSlotInvalidationCause(uint32 possible_causes, ReplicationSlot *s,
1875 : : XLogRecPtr oldestLSN, Oid dboid,
1876 : : TransactionId snapshotConflictHorizon,
1877 : : TimestampTz *inactive_since, TimestampTz now)
1878 : : {
1879 [ - + ]: 414 : Assert(possible_causes != RS_INVAL_NONE);
1880 : :
1881 [ + + ]: 414 : if (possible_causes & RS_INVAL_WAL_REMOVED)
1882 : : {
136 michael@paquier.xyz 1883 : 380 : XLogRecPtr restart_lsn = s->data.restart_lsn;
1884 : :
129 alvherre@kurilemu.de 1885 [ + + + + ]:GNC 380 : if (XLogRecPtrIsValid(restart_lsn) &&
1886 : : restart_lsn < oldestLSN)
389 akapila@postgresql.o 1887 :CBC 7 : return RS_INVAL_WAL_REMOVED;
1888 : : }
1889 : :
1890 [ + + ]: 407 : if (possible_causes & RS_INVAL_HORIZON)
1891 : : {
1892 : : /* invalid DB oid signals a shared relation */
1893 [ + - + + ]: 30 : if (SlotIsLogical(s) &&
1894 [ + - ]: 25 : (dboid == InvalidOid || dboid == s->data.database))
1895 : : {
136 michael@paquier.xyz 1896 : 30 : TransactionId effective_xmin = s->effective_xmin;
1897 : 30 : TransactionId catalog_effective_xmin = s->effective_catalog_xmin;
1898 : :
1899 [ - + - - ]: 30 : if (TransactionIdIsValid(effective_xmin) &&
136 michael@paquier.xyz 1900 :UBC 0 : TransactionIdPrecedesOrEquals(effective_xmin,
1901 : : snapshotConflictHorizon))
389 akapila@postgresql.o 1902 : 0 : return RS_INVAL_HORIZON;
136 michael@paquier.xyz 1903 [ + - + + ]:CBC 60 : else if (TransactionIdIsValid(catalog_effective_xmin) &&
1904 : 30 : TransactionIdPrecedesOrEquals(catalog_effective_xmin,
1905 : : snapshotConflictHorizon))
389 akapila@postgresql.o 1906 : 12 : return RS_INVAL_HORIZON;
1907 : : }
1908 : : }
1909 : :
1910 [ + + ]: 395 : if (possible_causes & RS_INVAL_WAL_LEVEL)
1911 : : {
1912 [ + - ]: 4 : if (SlotIsLogical(s))
1913 : 4 : return RS_INVAL_WAL_LEVEL;
1914 : : }
1915 : :
1916 [ + + ]: 391 : if (possible_causes & RS_INVAL_IDLE_TIMEOUT)
1917 : : {
1918 [ - + ]: 373 : Assert(now > 0);
1919 : :
1920 [ - + ]: 373 : if (CanInvalidateIdleSlot(s))
1921 : : {
1922 : : /*
1923 : : * Simulate the invalidation due to idle_timeout to test the
1924 : : * timeout behavior promptly, without waiting for it to trigger
1925 : : * naturally.
1926 : : */
1927 : : #ifdef USE_INJECTION_POINTS
389 akapila@postgresql.o 1928 [ # # ]:UBC 0 : if (IS_INJECTION_POINT_ATTACHED("slot-timeout-inval"))
1929 : : {
1930 : 0 : *inactive_since = 0; /* since the beginning of time */
1931 : 0 : return RS_INVAL_IDLE_TIMEOUT;
1932 : : }
1933 : : #endif
1934 : :
1935 : : /*
1936 : : * Check if the slot needs to be invalidated due to
1937 : : * idle_replication_slot_timeout GUC.
1938 : : */
1939 [ # # ]: 0 : if (TimestampDifferenceExceedsSeconds(s->inactive_since, now,
1940 : : idle_replication_slot_timeout_secs))
1941 : : {
1942 : 0 : *inactive_since = s->inactive_since;
1943 : 0 : return RS_INVAL_IDLE_TIMEOUT;
1944 : : }
1945 : : }
1946 : : }
1947 : :
389 akapila@postgresql.o 1948 :CBC 391 : return RS_INVAL_NONE;
1949 : : }
1950 : :
1951 : : /*
1952 : : * Helper for InvalidateObsoleteReplicationSlots
1953 : : *
1954 : : * Acquires the given slot and mark it invalid, if necessary and possible.
1955 : : *
1956 : : * Returns true if the slot was invalidated.
1957 : : *
1958 : : * Set *released_lock_out if ReplicationSlotControlLock was released in the
1959 : : * interim (and in that case we're not holding the lock at return, otherwise
1960 : : * we are).
1961 : : *
1962 : : * This is inherently racy, because we release the LWLock
1963 : : * for syscalls, so caller must restart if we return true.
1964 : : */
1965 : : static bool
1966 : 455 : InvalidatePossiblyObsoleteSlot(uint32 possible_causes,
1967 : : ReplicationSlot *s,
1968 : : XLogRecPtr oldestLSN,
1969 : : Oid dboid, TransactionId snapshotConflictHorizon,
1970 : : bool *released_lock_out)
1971 : : {
1738 alvherre@alvh.no-ip. 1972 : 455 : int last_signaled_pid = 0;
1973 : 455 : bool released_lock = false;
82 msawada@postgresql.o 1974 :GNC 455 : bool invalidated = false;
389 akapila@postgresql.o 1975 :CBC 455 : TimestampTz inactive_since = 0;
1976 : :
1977 : : for (;;)
2168 alvherre@alvh.no-ip. 1978 : 7 : {
1979 : : XLogRecPtr restart_lsn;
1980 : : NameData slotname;
1981 : : ProcNumber active_proc;
1738 1982 : 462 : int active_pid = 0;
723 akapila@postgresql.o 1983 : 462 : ReplicationSlotInvalidationCause invalidation_cause = RS_INVAL_NONE;
389 1984 : 462 : TimestampTz now = 0;
1985 : 462 : long slot_idle_secs = 0;
1986 : :
1738 alvherre@alvh.no-ip. 1987 [ - + ]: 462 : Assert(LWLockHeldByMeInMode(ReplicationSlotControlLock, LW_SHARED));
1988 : :
2168 1989 [ - + ]: 462 : if (!s->in_use)
1990 : : {
1738 alvherre@alvh.no-ip. 1991 [ # # ]:UBC 0 : if (released_lock)
1992 : 0 : LWLockRelease(ReplicationSlotControlLock);
1993 : 0 : break;
1994 : : }
1995 : :
389 akapila@postgresql.o 1996 [ + + ]:CBC 462 : if (possible_causes & RS_INVAL_IDLE_TIMEOUT)
1997 : : {
1998 : : /*
1999 : : * Assign the current time here to avoid system call overhead
2000 : : * while holding the spinlock in subsequent code.
2001 : : */
2002 : 394 : now = GetCurrentTimestamp();
2003 : : }
2004 : :
2005 : : /*
2006 : : * Check if the slot needs to be invalidated. If it needs to be
2007 : : * invalidated, and is not currently acquired, acquire it and mark it
2008 : : * as having been invalidated. We do this with the spinlock held to
2009 : : * avoid race conditions -- for example the restart_lsn could move
2010 : : * forward, or the slot could be dropped.
2011 : : */
2168 alvherre@alvh.no-ip. 2012 [ - + ]: 462 : SpinLockAcquire(&s->mutex);
2013 : :
2014 : 462 : restart_lsn = s->data.restart_lsn;
2015 : :
2016 : : /* we do nothing if the slot is already invalid */
1073 andres@anarazel.de 2017 [ + + ]: 462 : if (s->data.invalidated == RS_INVAL_NONE)
389 akapila@postgresql.o 2018 : 414 : invalidation_cause = DetermineSlotInvalidationCause(possible_causes,
2019 : : s, oldestLSN,
2020 : : dboid,
2021 : : snapshotConflictHorizon,
2022 : : &inactive_since,
2023 : : now);
2024 : :
2025 : : /* if there's no invalidation, we're done */
723 2026 [ + + ]: 462 : if (invalidation_cause == RS_INVAL_NONE)
2027 : : {
1738 alvherre@alvh.no-ip. 2028 : 439 : SpinLockRelease(&s->mutex);
2029 [ - + ]: 439 : if (released_lock)
1738 alvherre@alvh.no-ip. 2030 :UBC 0 : LWLockRelease(ReplicationSlotControlLock);
1738 alvherre@alvh.no-ip. 2031 :CBC 439 : break;
2032 : : }
2033 : :
2034 : 23 : slotname = s->data.name;
33 heikki.linnakangas@i 2035 :GNC 23 : active_proc = s->active_proc;
2036 : :
2037 : : /*
2038 : : * If the slot can be acquired, do so and mark it invalidated
2039 : : * immediately. Otherwise we'll signal the owning process, below, and
2040 : : * retry.
2041 : : *
2042 : : * Note: Unlike other slot attributes, slot's inactive_since can't be
2043 : : * changed until the acquired slot is released or the owning process
2044 : : * is terminated. So, the inactive slot can only be invalidated
2045 : : * immediately without being terminated.
2046 : : */
2047 [ + + ]: 23 : if (active_proc == INVALID_PROC_NUMBER)
2048 : : {
1738 alvherre@alvh.no-ip. 2049 :CBC 16 : MyReplicationSlot = s;
33 heikki.linnakangas@i 2050 :GNC 16 : s->active_proc = MyProcNumber;
723 akapila@postgresql.o 2051 :CBC 16 : s->data.invalidated = invalidation_cause;
2052 : :
2053 : : /*
2054 : : * XXX: We should consider not overwriting restart_lsn and instead
2055 : : * just rely on .invalidated.
2056 : : */
2057 [ + + ]: 16 : if (invalidation_cause == RS_INVAL_WAL_REMOVED)
2058 : : {
1073 andres@anarazel.de 2059 : 5 : s->data.restart_lsn = InvalidXLogRecPtr;
274 akorotkov@postgresql 2060 : 5 : s->last_saved_restart_lsn = InvalidXLogRecPtr;
2061 : : }
2062 : :
2063 : : /* Let caller know */
82 msawada@postgresql.o 2064 :GNC 16 : invalidated = true;
2065 : : }
2066 : : else
2067 : : {
33 heikki.linnakangas@i 2068 : 7 : active_pid = GetPGProcByNumber(active_proc)->pid;
2069 [ - + ]: 7 : Assert(active_pid != 0);
2070 : : }
2071 : :
1738 alvherre@alvh.no-ip. 2072 :CBC 23 : SpinLockRelease(&s->mutex);
2073 : :
2074 : : /*
2075 : : * Calculate the idle time duration of the slot if slot is marked
2076 : : * invalidated with RS_INVAL_IDLE_TIMEOUT.
2077 : : */
389 akapila@postgresql.o 2078 [ - + ]: 23 : if (invalidation_cause == RS_INVAL_IDLE_TIMEOUT)
2079 : : {
2080 : : int slot_idle_usecs;
2081 : :
389 akapila@postgresql.o 2082 :UBC 0 : TimestampDifference(inactive_since, now, &slot_idle_secs,
2083 : : &slot_idle_usecs);
2084 : : }
2085 : :
33 heikki.linnakangas@i 2086 [ + + ]:GNC 23 : if (active_proc != INVALID_PROC_NUMBER)
2087 : : {
2088 : : /*
2089 : : * Prepare the sleep on the slot's condition variable before
2090 : : * releasing the lock, to close a possible race condition if the
2091 : : * slot is released before the sleep below.
2092 : : */
1738 alvherre@alvh.no-ip. 2093 :CBC 7 : ConditionVariablePrepareToSleep(&s->active_cv);
2094 : :
2095 : 7 : LWLockRelease(ReplicationSlotControlLock);
2096 : 7 : released_lock = true;
2097 : :
2098 : : /*
2099 : : * Signal to terminate the process that owns the slot, if we
2100 : : * haven't already signalled it. (Avoidance of repeated
2101 : : * signalling is the only reason for there to be a loop in this
2102 : : * routine; otherwise we could rely on caller's restart loop.)
2103 : : *
2104 : : * There is the race condition that other process may own the slot
2105 : : * after its current owner process is terminated and before this
2106 : : * process owns it. To handle that, we signal only if the PID of
2107 : : * the owning process has changed from the previous time. (This
2108 : : * logic assumes that the same PID is not reused very quickly.)
2109 : : */
2110 [ + - ]: 7 : if (last_signaled_pid != active_pid)
2111 : : {
723 akapila@postgresql.o 2112 : 7 : ReportSlotInvalidation(invalidation_cause, true, active_pid,
2113 : : slotname, restart_lsn,
2114 : : oldestLSN, snapshotConflictHorizon,
2115 : : slot_idle_secs);
2116 : :
1073 andres@anarazel.de 2117 [ + + ]: 7 : if (MyBackendType == B_STARTUP)
33 heikki.linnakangas@i 2118 :GNC 5 : (void) SignalRecoveryConflict(GetPGProcByNumber(active_proc),
2119 : : active_pid,
2120 : : RECOVERY_CONFLICT_LOGICALSLOT);
2121 : : else
1073 andres@anarazel.de 2122 :CBC 2 : (void) kill(active_pid, SIGTERM);
2123 : :
1738 alvherre@alvh.no-ip. 2124 : 7 : last_signaled_pid = active_pid;
2125 : : }
2126 : :
2127 : : /* Wait until the slot is released. */
2128 : 7 : ConditionVariableSleep(&s->active_cv,
2129 : : WAIT_EVENT_REPLICATION_SLOT_DROP);
2130 : :
2131 : : /*
2132 : : * Re-acquire lock and start over; we expect to invalidate the
2133 : : * slot next time (unless another process acquires the slot in the
2134 : : * meantime).
2135 : : *
2136 : : * Note: It is possible for a slot to advance its restart_lsn or
2137 : : * xmin values sufficiently between when we release the mutex and
2138 : : * when we recheck, moving from a conflicting state to a non
2139 : : * conflicting state. This is intentional and safe: if the slot
2140 : : * has caught up while we're busy here, the resources we were
2141 : : * concerned about (WAL segments or tuples) have not yet been
2142 : : * removed, and there's no reason to invalidate the slot.
2143 : : */
2144 : 7 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
2145 : 7 : continue;
2146 : : }
2147 : : else
2148 : : {
2149 : : /*
2150 : : * We hold the slot now and have already invalidated it; flush it
2151 : : * to ensure that state persists.
2152 : : *
2153 : : * Don't want to hold ReplicationSlotControlLock across file
2154 : : * system operations, so release it now but be sure to tell caller
2155 : : * to restart from scratch.
2156 : : */
2157 : 16 : LWLockRelease(ReplicationSlotControlLock);
2158 : 16 : released_lock = true;
2159 : :
2160 : : /* Make sure the invalidated state persists across server restart */
2161 : 16 : ReplicationSlotMarkDirty();
2162 : 16 : ReplicationSlotSave();
2163 : 16 : ReplicationSlotRelease();
2164 : :
723 akapila@postgresql.o 2165 : 16 : ReportSlotInvalidation(invalidation_cause, false, active_pid,
2166 : : slotname, restart_lsn,
2167 : : oldestLSN, snapshotConflictHorizon,
2168 : : slot_idle_secs);
2169 : :
2170 : : /* done with this slot for now */
1738 alvherre@alvh.no-ip. 2171 : 16 : break;
2172 : : }
2173 : : }
2174 : :
2175 [ - + ]: 455 : Assert(released_lock == !LWLockHeldByMe(ReplicationSlotControlLock));
2176 : :
82 msawada@postgresql.o 2177 :GNC 455 : *released_lock_out = released_lock;
2178 : 455 : return invalidated;
2179 : : }
2180 : :
2181 : : /*
2182 : : * Invalidate slots that require resources about to be removed.
2183 : : *
2184 : : * Returns true when any slot have got invalidated.
2185 : : *
2186 : : * Whether a slot needs to be invalidated depends on the invalidation cause.
2187 : : * A slot is invalidated if it:
2188 : : * - RS_INVAL_WAL_REMOVED: requires a LSN older than the given segment
2189 : : * - RS_INVAL_HORIZON: requires a snapshot <= the given horizon in the given
2190 : : * db; dboid may be InvalidOid for shared relations
2191 : : * - RS_INVAL_WAL_LEVEL: is a logical slot and effective_wal_level is not
2192 : : * logical.
2193 : : * - RS_INVAL_IDLE_TIMEOUT: has been idle longer than the configured
2194 : : * "idle_replication_slot_timeout" duration.
2195 : : *
2196 : : * Note: This function attempts to invalidate the slot for multiple possible
2197 : : * causes in a single pass, minimizing redundant iterations. The "cause"
2198 : : * parameter can be a MASK representing one or more of the defined causes.
2199 : : *
2200 : : * If it invalidates the last logical slot in the cluster, it requests to
2201 : : * disable logical decoding.
2202 : : *
2203 : : * NB - this runs as part of checkpoint, so avoid raising errors if possible.
2204 : : */
2205 : : bool
389 akapila@postgresql.o 2206 :CBC 1832 : InvalidateObsoleteReplicationSlots(uint32 possible_causes,
2207 : : XLogSegNo oldestSegno, Oid dboid,
2208 : : TransactionId snapshotConflictHorizon)
2209 : : {
2210 : : XLogRecPtr oldestLSN;
1703 alvherre@alvh.no-ip. 2211 : 1832 : bool invalidated = false;
82 msawada@postgresql.o 2212 :GNC 1832 : bool invalidated_logical = false;
2213 : : bool found_valid_logicalslot;
2214 : :
389 akapila@postgresql.o 2215 [ + + - + ]:CBC 1832 : Assert(!(possible_causes & RS_INVAL_HORIZON) || TransactionIdIsValid(snapshotConflictHorizon));
2216 [ + + - + ]: 1832 : Assert(!(possible_causes & RS_INVAL_WAL_REMOVED) || oldestSegno > 0);
2217 [ - + ]: 1832 : Assert(possible_causes != RS_INVAL_NONE);
2218 : :
1073 andres@anarazel.de 2219 [ + + ]: 1832 : if (max_replication_slots == 0)
1073 andres@anarazel.de 2220 :GBC 1 : return invalidated;
2221 : :
1738 alvherre@alvh.no-ip. 2222 :CBC 1831 : XLogSegNoOffsetToRecPtr(oldestSegno, 0, wal_segment_size, oldestLSN);
2223 : :
2224 : 1847 : restart:
82 msawada@postgresql.o 2225 :GNC 1847 : found_valid_logicalslot = false;
1738 alvherre@alvh.no-ip. 2226 :CBC 1847 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
2227 [ + + ]: 19787 : for (int i = 0; i < max_replication_slots; i++)
2228 : : {
2229 : 17956 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
82 msawada@postgresql.o 2230 :GNC 17956 : bool released_lock = false;
2231 : :
1738 alvherre@alvh.no-ip. 2232 [ + + ]:CBC 17956 : if (!s->in_use)
2233 : 17501 : continue;
2234 : :
2235 : : /* Prevent invalidation of logical slots during binary upgrade */
247 akapila@postgresql.o 2236 [ + + + + ]: 464 : if (SlotIsLogical(s) && IsBinaryUpgrade)
2237 : : {
82 msawada@postgresql.o 2238 :GNC 9 : SpinLockAcquire(&s->mutex);
2239 : 9 : found_valid_logicalslot |= (s->data.invalidated == RS_INVAL_NONE);
2240 : 9 : SpinLockRelease(&s->mutex);
2241 : :
247 akapila@postgresql.o 2242 :CBC 9 : continue;
2243 : : }
2244 : :
82 msawada@postgresql.o 2245 [ + + ]:GNC 455 : if (InvalidatePossiblyObsoleteSlot(possible_causes, s, oldestLSN,
2246 : : dboid, snapshotConflictHorizon,
2247 : : &released_lock))
2248 : : {
2249 [ - + ]: 16 : Assert(released_lock);
2250 : :
2251 : : /* Remember we have invalidated a physical or logical slot */
2252 : 16 : invalidated = true;
2253 : :
2254 : : /*
2255 : : * Additionally, remember we have invalidated a logical slot as we
2256 : : * can request disabling logical decoding later.
2257 : : */
2258 [ + + ]: 16 : if (SlotIsLogical(s))
2259 : 13 : invalidated_logical = true;
2260 : : }
2261 : : else
2262 : : {
2263 : : /*
2264 : : * We need to check if the slot is invalidated here since
2265 : : * InvalidatePossiblyObsoleteSlot() returns false also if the slot
2266 : : * is already invalidated.
2267 : : */
2268 : 439 : SpinLockAcquire(&s->mutex);
2269 : 878 : found_valid_logicalslot |=
2270 [ + + + + ]: 439 : (SlotIsLogical(s) && (s->data.invalidated == RS_INVAL_NONE));
2271 : 439 : SpinLockRelease(&s->mutex);
2272 : : }
2273 : :
2274 : : /* if the lock was released, start from scratch */
2275 [ + + ]: 455 : if (released_lock)
2276 : 16 : goto restart;
2277 : : }
2168 alvherre@alvh.no-ip. 2278 :CBC 1831 : LWLockRelease(ReplicationSlotControlLock);
2279 : :
2280 : : /*
2281 : : * If any slots have been invalidated, recalculate the resource limits.
2282 : : */
1703 2283 [ + + ]: 1831 : if (invalidated)
2284 : : {
2285 : 11 : ReplicationSlotsComputeRequiredXmin(false);
2286 : 11 : ReplicationSlotsComputeRequiredLSN();
2287 : : }
2288 : :
2289 : : /*
2290 : : * Request the checkpointer to disable logical decoding if no valid
2291 : : * logical slots remain. If called by the checkpointer during a
2292 : : * checkpoint, only the request is initiated; actual deactivation is
2293 : : * deferred until after the checkpoint completes.
2294 : : */
82 msawada@postgresql.o 2295 [ + + + - ]:GNC 1831 : if (invalidated_logical && !found_valid_logicalslot)
2296 : 8 : RequestDisableLogicalDecoding();
2297 : :
1703 alvherre@alvh.no-ip. 2298 :CBC 1831 : return invalidated;
2299 : : }
2300 : :
2301 : : /*
2302 : : * Flush all replication slots to disk.
2303 : : *
2304 : : * It is convenient to flush dirty replication slots at the time of checkpoint.
2305 : : * Additionally, in case of a shutdown checkpoint, we also identify the slots
2306 : : * for which the confirmed_flush LSN has been updated since the last time it
2307 : : * was saved and flush them.
2308 : : */
2309 : : void
913 akapila@postgresql.o 2310 : 1802 : CheckPointReplicationSlots(bool is_shutdown)
2311 : : {
2312 : : int i;
261 akorotkov@postgresql 2313 : 1802 : bool last_saved_restart_lsn_updated = false;
2314 : :
4142 peter_e@gmx.net 2315 [ + + ]: 1802 : elog(DEBUG1, "performing replication slot checkpoint");
2316 : :
2317 : : /*
2318 : : * Prevent any slot from being created/dropped while we're active. As we
2319 : : * explicitly do *not* want to block iterating over replication_slots or
2320 : : * acquiring a slot we cannot take the control lock - but that's OK,
2321 : : * because holding ReplicationSlotAllocationLock is strictly stronger, and
2322 : : * enough to guarantee that nobody can change the in_use bits on us.
2323 : : *
2324 : : * Additionally, acquiring the Allocation lock is necessary to serialize
2325 : : * the slot flush process with concurrent slot WAL reservation. This
2326 : : * ensures that the WAL position being reserved is either flushed to disk
2327 : : * or is beyond or equal to the redo pointer of the current checkpoint
2328 : : * (See ReplicationSlotReserveWal for details).
2329 : : */
4426 rhaas@postgresql.org 2330 : 1802 : LWLockAcquire(ReplicationSlotAllocationLock, LW_SHARED);
2331 : :
2332 [ + + ]: 19547 : for (i = 0; i < max_replication_slots; i++)
2333 : : {
2334 : 17745 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
2335 : : char path[MAXPGPATH];
2336 : :
2337 [ + + ]: 17745 : if (!s->in_use)
2338 : 17349 : continue;
2339 : :
2340 : : /* save the slot to disk, locking is handled in SaveSlotToPath() */
562 michael@paquier.xyz 2341 : 396 : sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(s->data.name));
2342 : :
2343 : : /*
2344 : : * Slot's data is not flushed each time the confirmed_flush LSN is
2345 : : * updated as that could lead to frequent writes. However, we decide
2346 : : * to force a flush of all logical slot's data at the time of shutdown
2347 : : * if the confirmed_flush LSN is changed since we last flushed it to
2348 : : * disk. This helps in avoiding an unnecessary retreat of the
2349 : : * confirmed_flush LSN after restart.
2350 : : */
913 akapila@postgresql.o 2351 [ + + + + ]: 396 : if (is_shutdown && SlotIsLogical(s))
2352 : : {
2353 [ - + ]: 89 : SpinLockAcquire(&s->mutex);
2354 : :
2355 [ + + ]: 89 : if (s->data.invalidated == RS_INVAL_NONE &&
642 2356 [ + + ]: 88 : s->data.confirmed_flush > s->last_saved_confirmed_flush)
2357 : : {
913 2358 : 39 : s->just_dirtied = true;
2359 : 39 : s->dirty = true;
2360 : : }
2361 : 89 : SpinLockRelease(&s->mutex);
2362 : : }
2363 : :
2364 : : /*
2365 : : * Track if we're going to update slot's last_saved_restart_lsn. We
2366 : : * need this to know if we need to recompute the required LSN.
2367 : : */
261 akorotkov@postgresql 2368 [ + + ]: 396 : if (s->last_saved_restart_lsn != s->data.restart_lsn)
2369 : 205 : last_saved_restart_lsn_updated = true;
2370 : :
4426 rhaas@postgresql.org 2371 : 396 : SaveSlotToPath(s, path, LOG);
2372 : : }
2373 : 1802 : LWLockRelease(ReplicationSlotAllocationLock);
2374 : :
2375 : : /*
2376 : : * Recompute the required LSN if SaveSlotToPath() updated
2377 : : * last_saved_restart_lsn for any slot.
2378 : : */
261 akorotkov@postgresql 2379 [ + + ]: 1802 : if (last_saved_restart_lsn_updated)
2380 : 205 : ReplicationSlotsComputeRequiredLSN();
4426 rhaas@postgresql.org 2381 : 1802 : }
2382 : :
2383 : : /*
2384 : : * Load all replication slots from disk into memory at server startup. This
2385 : : * needs to be run before we start crash recovery.
2386 : : */
2387 : : void
4294 andres@anarazel.de 2388 : 1002 : StartupReplicationSlots(void)
2389 : : {
2390 : : DIR *replication_dir;
2391 : : struct dirent *replication_de;
2392 : :
4142 peter_e@gmx.net 2393 [ + + ]: 1002 : elog(DEBUG1, "starting up replication slots");
2394 : :
2395 : : /* restore all slots by iterating over all on-disk entries */
562 michael@paquier.xyz 2396 : 1002 : replication_dir = AllocateDir(PG_REPLSLOT_DIR);
2397 [ + + ]: 3124 : while ((replication_de = ReadDir(replication_dir, PG_REPLSLOT_DIR)) != NULL)
2398 : : {
2399 : : char path[MAXPGPATH + sizeof(PG_REPLSLOT_DIR)];
2400 : : PGFileType de_type;
2401 : :
4426 rhaas@postgresql.org 2402 [ + + ]: 2124 : if (strcmp(replication_de->d_name, ".") == 0 ||
2403 [ + + ]: 1123 : strcmp(replication_de->d_name, "..") == 0)
2404 : 2003 : continue;
2405 : :
562 michael@paquier.xyz 2406 : 121 : snprintf(path, sizeof(path), "%s/%s", PG_REPLSLOT_DIR, replication_de->d_name);
1290 2407 : 121 : de_type = get_dirent_type(path, replication_de, false, DEBUG1);
2408 : :
2409 : : /* we're only creating directories here, skip if it's not our's */
2410 [ + - - + ]: 121 : if (de_type != PGFILETYPE_ERROR && de_type != PGFILETYPE_DIR)
4426 rhaas@postgresql.org 2411 :UBC 0 : continue;
2412 : :
2413 : : /* we crashed while a slot was being setup or deleted, clean up */
4089 andres@anarazel.de 2414 [ - + ]:CBC 121 : if (pg_str_endswith(replication_de->d_name, ".tmp"))
2415 : : {
4426 rhaas@postgresql.org 2416 [ # # ]:UBC 0 : if (!rmtree(path, true))
2417 : : {
2418 [ # # ]: 0 : ereport(WARNING,
2419 : : (errmsg("could not remove directory \"%s\"",
2420 : : path)));
2421 : 0 : continue;
2422 : : }
562 michael@paquier.xyz 2423 : 0 : fsync_fname(PG_REPLSLOT_DIR, true);
4426 rhaas@postgresql.org 2424 : 0 : continue;
2425 : : }
2426 : :
2427 : : /* looks like a slot in a normal state, restore */
4426 rhaas@postgresql.org 2428 :CBC 121 : RestoreSlotFromDisk(replication_de->d_name);
2429 : : }
2430 : 1000 : FreeDir(replication_dir);
2431 : :
2432 : : /* currently no slots exist, we're done. */
2433 [ + + ]: 1000 : if (max_replication_slots <= 0)
4426 rhaas@postgresql.org 2434 :GBC 1 : return;
2435 : :
2436 : : /* Now that we have recovered all the data, compute replication xmin */
4395 rhaas@postgresql.org 2437 :CBC 999 : ReplicationSlotsComputeRequiredXmin(false);
4426 2438 : 999 : ReplicationSlotsComputeRequiredLSN();
2439 : : }
2440 : :
2441 : : /* ----
2442 : : * Manipulation of on-disk state of replication slots
2443 : : *
2444 : : * NB: none of the routines below should take any notice whether a slot is the
2445 : : * current one or not, that's all handled a layer above.
2446 : : * ----
2447 : : */
2448 : : static void
2449 : 693 : CreateSlotOnDisk(ReplicationSlot *slot)
2450 : : {
2451 : : char tmppath[MAXPGPATH];
2452 : : char path[MAXPGPATH];
2453 : : struct stat st;
2454 : :
2455 : : /*
2456 : : * No need to take out the io_in_progress_lock, nobody else can see this
2457 : : * slot yet, so nobody else will write. We're reusing SaveSlotToPath which
2458 : : * takes out the lock, if we'd take the lock here, we'd deadlock.
2459 : : */
2460 : :
562 michael@paquier.xyz 2461 : 693 : sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(slot->data.name));
2462 : 693 : sprintf(tmppath, "%s/%s.tmp", PG_REPLSLOT_DIR, NameStr(slot->data.name));
2463 : :
2464 : : /*
2465 : : * It's just barely possible that some previous effort to create or drop a
2466 : : * slot with this name left a temp directory lying around. If that seems
2467 : : * to be the case, try to remove it. If the rmtree() fails, we'll error
2468 : : * out at the MakePGDirectory() below, so we don't bother checking
2469 : : * success.
2470 : : */
4426 rhaas@postgresql.org 2471 [ - + - - ]: 693 : if (stat(tmppath, &st) == 0 && S_ISDIR(st.st_mode))
4426 rhaas@postgresql.org 2472 :UBC 0 : rmtree(tmppath, true);
2473 : :
2474 : : /* Create and fsync the temporary slot directory. */
2899 sfrost@snowman.net 2475 [ - + ]:CBC 693 : if (MakePGDirectory(tmppath) < 0)
4426 rhaas@postgresql.org 2476 [ # # ]:UBC 0 : ereport(ERROR,
2477 : : (errcode_for_file_access(),
2478 : : errmsg("could not create directory \"%s\": %m",
2479 : : tmppath)));
4426 rhaas@postgresql.org 2480 :CBC 693 : fsync_fname(tmppath, true);
2481 : :
2482 : : /* Write the actual state file. */
4331 bruce@momjian.us 2483 : 693 : slot->dirty = true; /* signal that we really need to write */
4426 rhaas@postgresql.org 2484 : 693 : SaveSlotToPath(slot, tmppath, ERROR);
2485 : :
2486 : : /* Rename the directory into place. */
2487 [ - + ]: 693 : if (rename(tmppath, path) != 0)
4426 rhaas@postgresql.org 2488 [ # # ]:UBC 0 : ereport(ERROR,
2489 : : (errcode_for_file_access(),
2490 : : errmsg("could not rename file \"%s\" to \"%s\": %m",
2491 : : tmppath, path)));
2492 : :
2493 : : /*
2494 : : * If we'd now fail - really unlikely - we wouldn't know whether this slot
2495 : : * would persist after an OS crash or not - so, force a restart. The
2496 : : * restart would try to fsync this again till it works.
2497 : : */
4426 rhaas@postgresql.org 2498 :CBC 693 : START_CRIT_SECTION();
2499 : :
2500 : 693 : fsync_fname(path, true);
562 michael@paquier.xyz 2501 : 693 : fsync_fname(PG_REPLSLOT_DIR, true);
2502 : :
4426 rhaas@postgresql.org 2503 [ - + ]: 693 : END_CRIT_SECTION();
2504 : 693 : }
2505 : :
2506 : : /*
2507 : : * Shared functionality between saving and creating a replication slot.
2508 : : */
2509 : : static void
2510 : 2556 : SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
2511 : : {
2512 : : char tmppath[MAXPGPATH];
2513 : : char path[MAXPGPATH];
2514 : : int fd;
2515 : : ReplicationSlotOnDisk cp;
2516 : : bool was_dirty;
2517 : :
2518 : : /* first check whether there's something to write out */
3813 2519 [ - + ]: 2556 : SpinLockAcquire(&slot->mutex);
2520 : 2556 : was_dirty = slot->dirty;
2521 : 2556 : slot->just_dirtied = false;
2522 : 2556 : SpinLockRelease(&slot->mutex);
2523 : :
2524 : : /* and don't do anything if there's nothing to write */
4426 2525 [ + + ]: 2556 : if (!was_dirty)
2526 : 141 : return;
2527 : :
3698 2528 : 2415 : LWLockAcquire(&slot->io_in_progress_lock, LW_EXCLUSIVE);
2529 : :
2530 : : /* silence valgrind :( */
4426 2531 : 2415 : memset(&cp, 0, sizeof(ReplicationSlotOnDisk));
2532 : :
2533 : 2415 : sprintf(tmppath, "%s/state.tmp", dir);
2534 : 2415 : sprintf(path, "%s/state", dir);
2535 : :
3095 peter_e@gmx.net 2536 : 2415 : fd = OpenTransientFile(tmppath, O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
4426 rhaas@postgresql.org 2537 [ - + ]: 2415 : if (fd < 0)
2538 : : {
2539 : : /*
2540 : : * If not an ERROR, then release the lock before returning. In case
2541 : : * of an ERROR, the error recovery path automatically releases the
2542 : : * lock, but no harm in explicitly releasing even in that case. Note
2543 : : * that LWLockRelease() could affect errno.
2544 : : */
2170 peter@eisentraut.org 2545 :UBC 0 : int save_errno = errno;
2546 : :
2180 2547 : 0 : LWLockRelease(&slot->io_in_progress_lock);
2170 2548 : 0 : errno = save_errno;
4426 rhaas@postgresql.org 2549 [ # # ]: 0 : ereport(elevel,
2550 : : (errcode_for_file_access(),
2551 : : errmsg("could not create file \"%s\": %m",
2552 : : tmppath)));
2553 : 0 : return;
2554 : : }
2555 : :
4426 rhaas@postgresql.org 2556 :CBC 2415 : cp.magic = SLOT_MAGIC;
4149 heikki.linnakangas@i 2557 : 2415 : INIT_CRC32C(cp.checksum);
4141 andres@anarazel.de 2558 : 2415 : cp.version = SLOT_VERSION;
2559 : 2415 : cp.length = ReplicationSlotOnDiskV2Size;
2560 : :
4426 rhaas@postgresql.org 2561 [ - + ]: 2415 : SpinLockAcquire(&slot->mutex);
2562 : :
2563 : 2415 : memcpy(&cp.slotdata, &slot->data, sizeof(ReplicationSlotPersistentData));
2564 : :
2565 : 2415 : SpinLockRelease(&slot->mutex);
2566 : :
4149 heikki.linnakangas@i 2567 : 2415 : COMP_CRC32C(cp.checksum,
2568 : : (char *) (&cp) + ReplicationSlotOnDiskNotChecksummedSize,
2569 : : ReplicationSlotOnDiskChecksummedSize);
4141 andres@anarazel.de 2570 : 2415 : FIN_CRC32C(cp.checksum);
2571 : :
2779 michael@paquier.xyz 2572 : 2415 : errno = 0;
3284 rhaas@postgresql.org 2573 : 2415 : pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_WRITE);
4426 2574 [ - + ]: 2415 : if ((write(fd, &cp, sizeof(cp))) != sizeof(cp))
2575 : : {
4331 bruce@momjian.us 2576 :UBC 0 : int save_errno = errno;
2577 : :
3284 rhaas@postgresql.org 2578 : 0 : pgstat_report_wait_end();
4426 2579 : 0 : CloseTransientFile(fd);
156 michael@paquier.xyz 2580 : 0 : unlink(tmppath);
2180 peter@eisentraut.org 2581 : 0 : LWLockRelease(&slot->io_in_progress_lock);
2582 : :
2583 : : /* if write didn't set errno, assume problem is no disk space */
2820 michael@paquier.xyz 2584 [ # # ]: 0 : errno = save_errno ? save_errno : ENOSPC;
4426 rhaas@postgresql.org 2585 [ # # ]: 0 : ereport(elevel,
2586 : : (errcode_for_file_access(),
2587 : : errmsg("could not write to file \"%s\": %m",
2588 : : tmppath)));
2589 : 0 : return;
2590 : : }
3284 rhaas@postgresql.org 2591 :CBC 2415 : pgstat_report_wait_end();
2592 : :
2593 : : /* fsync the temporary file */
2594 : 2415 : pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_SYNC);
4426 2595 [ - + ]: 2415 : if (pg_fsync(fd) != 0)
2596 : : {
4331 bruce@momjian.us 2597 :UBC 0 : int save_errno = errno;
2598 : :
3284 rhaas@postgresql.org 2599 : 0 : pgstat_report_wait_end();
4426 2600 : 0 : CloseTransientFile(fd);
156 michael@paquier.xyz 2601 : 0 : unlink(tmppath);
2180 peter@eisentraut.org 2602 : 0 : LWLockRelease(&slot->io_in_progress_lock);
2603 : :
4426 rhaas@postgresql.org 2604 : 0 : errno = save_errno;
2605 [ # # ]: 0 : ereport(elevel,
2606 : : (errcode_for_file_access(),
2607 : : errmsg("could not fsync file \"%s\": %m",
2608 : : tmppath)));
2609 : 0 : return;
2610 : : }
3284 rhaas@postgresql.org 2611 :CBC 2415 : pgstat_report_wait_end();
2612 : :
2444 peter@eisentraut.org 2613 [ - + ]: 2415 : if (CloseTransientFile(fd) != 0)
2614 : : {
2170 peter@eisentraut.org 2615 :UBC 0 : int save_errno = errno;
2616 : :
156 michael@paquier.xyz 2617 : 0 : unlink(tmppath);
2180 peter@eisentraut.org 2618 : 0 : LWLockRelease(&slot->io_in_progress_lock);
2619 : :
2170 2620 : 0 : errno = save_errno;
2563 michael@paquier.xyz 2621 [ # # ]: 0 : ereport(elevel,
2622 : : (errcode_for_file_access(),
2623 : : errmsg("could not close file \"%s\": %m",
2624 : : tmppath)));
2524 2625 : 0 : return;
2626 : : }
2627 : :
2628 : : /* rename to permanent file, fsync file and directory */
4426 rhaas@postgresql.org 2629 [ - + ]:CBC 2415 : if (rename(tmppath, path) != 0)
2630 : : {
2170 peter@eisentraut.org 2631 :UBC 0 : int save_errno = errno;
2632 : :
156 michael@paquier.xyz 2633 : 0 : unlink(tmppath);
2180 peter@eisentraut.org 2634 : 0 : LWLockRelease(&slot->io_in_progress_lock);
2635 : :
2170 2636 : 0 : errno = save_errno;
4426 rhaas@postgresql.org 2637 [ # # ]: 0 : ereport(elevel,
2638 : : (errcode_for_file_access(),
2639 : : errmsg("could not rename file \"%s\" to \"%s\": %m",
2640 : : tmppath, path)));
2641 : 0 : return;
2642 : : }
2643 : :
2644 : : /*
2645 : : * Check CreateSlotOnDisk() for the reasoning of using a critical section.
2646 : : */
4426 rhaas@postgresql.org 2647 :CBC 2415 : START_CRIT_SECTION();
2648 : :
2649 : 2415 : fsync_fname(path, false);
3658 andres@anarazel.de 2650 : 2415 : fsync_fname(dir, true);
562 michael@paquier.xyz 2651 : 2415 : fsync_fname(PG_REPLSLOT_DIR, true);
2652 : :
4426 rhaas@postgresql.org 2653 [ - + ]: 2415 : END_CRIT_SECTION();
2654 : :
2655 : : /*
2656 : : * Successfully wrote, unset dirty bit, unless somebody dirtied again
2657 : : * already and remember the confirmed_flush LSN value.
2658 : : */
3813 2659 [ - + ]: 2415 : SpinLockAcquire(&slot->mutex);
2660 [ + + ]: 2415 : if (!slot->just_dirtied)
2661 : 2406 : slot->dirty = false;
913 akapila@postgresql.o 2662 : 2415 : slot->last_saved_confirmed_flush = cp.slotdata.confirmed_flush;
274 akorotkov@postgresql 2663 : 2415 : slot->last_saved_restart_lsn = cp.slotdata.restart_lsn;
3813 rhaas@postgresql.org 2664 : 2415 : SpinLockRelease(&slot->mutex);
2665 : :
3698 2666 : 2415 : LWLockRelease(&slot->io_in_progress_lock);
2667 : : }
2668 : :
2669 : : /*
2670 : : * Load a single slot from disk into memory.
2671 : : */
2672 : : static void
4426 2673 : 121 : RestoreSlotFromDisk(const char *name)
2674 : : {
2675 : : ReplicationSlotOnDisk cp;
2676 : : int i;
2677 : : char slotdir[MAXPGPATH + sizeof(PG_REPLSLOT_DIR)];
2678 : : char path[MAXPGPATH + sizeof(PG_REPLSLOT_DIR) + 10];
2679 : : int fd;
2680 : 121 : bool restored = false;
2681 : : int readBytes;
2682 : : pg_crc32c checksum;
403 akapila@postgresql.o 2683 : 121 : TimestampTz now = 0;
2684 : :
2685 : : /* no need to lock here, no concurrent access allowed yet */
2686 : :
2687 : : /* delete temp file if it exists */
562 michael@paquier.xyz 2688 : 121 : sprintf(slotdir, "%s/%s", PG_REPLSLOT_DIR, name);
2751 2689 : 121 : sprintf(path, "%s/state.tmp", slotdir);
4426 rhaas@postgresql.org 2690 [ + - - + ]: 121 : if (unlink(path) < 0 && errno != ENOENT)
4426 rhaas@postgresql.org 2691 [ # # ]:UBC 0 : ereport(PANIC,
2692 : : (errcode_for_file_access(),
2693 : : errmsg("could not remove file \"%s\": %m", path)));
2694 : :
2751 michael@paquier.xyz 2695 :CBC 121 : sprintf(path, "%s/state", slotdir);
2696 : :
4426 rhaas@postgresql.org 2697 [ + + ]: 121 : elog(DEBUG1, "restoring replication slot from \"%s\"", path);
2698 : :
2699 : : /* on some operating systems fsyncing a file requires O_RDWR */
2354 andres@anarazel.de 2700 : 121 : fd = OpenTransientFile(path, O_RDWR | PG_BINARY);
2701 : :
2702 : : /*
2703 : : * We do not need to handle this as we are rename()ing the directory into
2704 : : * place only after we fsync()ed the state file.
2705 : : */
4426 rhaas@postgresql.org 2706 [ - + ]: 121 : if (fd < 0)
4426 rhaas@postgresql.org 2707 [ # # ]:UBC 0 : ereport(PANIC,
2708 : : (errcode_for_file_access(),
2709 : : errmsg("could not open file \"%s\": %m", path)));
2710 : :
2711 : : /*
2712 : : * Sync state file before we're reading from it. We might have crashed
2713 : : * while it wasn't synced yet and we shouldn't continue on that basis.
2714 : : */
3284 rhaas@postgresql.org 2715 :CBC 121 : pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_RESTORE_SYNC);
4426 2716 [ - + ]: 121 : if (pg_fsync(fd) != 0)
4426 rhaas@postgresql.org 2717 [ # # ]:UBC 0 : ereport(PANIC,
2718 : : (errcode_for_file_access(),
2719 : : errmsg("could not fsync file \"%s\": %m",
2720 : : path)));
3284 rhaas@postgresql.org 2721 :CBC 121 : pgstat_report_wait_end();
2722 : :
2723 : : /* Also sync the parent directory */
4426 2724 : 121 : START_CRIT_SECTION();
2751 michael@paquier.xyz 2725 : 121 : fsync_fname(slotdir, true);
4426 rhaas@postgresql.org 2726 [ - + ]: 121 : END_CRIT_SECTION();
2727 : :
2728 : : /* read part of statefile that's guaranteed to be version independent */
3284 2729 : 121 : pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_READ);
4426 2730 : 121 : readBytes = read(fd, &cp, ReplicationSlotOnDiskConstantSize);
3284 2731 : 121 : pgstat_report_wait_end();
4426 2732 [ - + ]: 121 : if (readBytes != ReplicationSlotOnDiskConstantSize)
2733 : : {
2797 michael@paquier.xyz 2734 [ # # ]:UBC 0 : if (readBytes < 0)
2735 [ # # ]: 0 : ereport(PANIC,
2736 : : (errcode_for_file_access(),
2737 : : errmsg("could not read file \"%s\": %m", path)));
2738 : : else
2739 [ # # ]: 0 : ereport(PANIC,
2740 : : (errcode(ERRCODE_DATA_CORRUPTED),
2741 : : errmsg("could not read file \"%s\": read %d of %zu",
2742 : : path, readBytes,
2743 : : (Size) ReplicationSlotOnDiskConstantSize)));
2744 : : }
2745 : :
2746 : : /* verify magic */
4426 rhaas@postgresql.org 2747 [ - + ]:CBC 121 : if (cp.magic != SLOT_MAGIC)
4426 rhaas@postgresql.org 2748 [ # # ]:UBC 0 : ereport(PANIC,
2749 : : (errcode(ERRCODE_DATA_CORRUPTED),
2750 : : errmsg("replication slot file \"%s\" has wrong magic number: %u instead of %u",
2751 : : path, cp.magic, SLOT_MAGIC)));
2752 : :
2753 : : /* verify version */
4426 rhaas@postgresql.org 2754 [ - + ]:CBC 121 : if (cp.version != SLOT_VERSION)
4426 rhaas@postgresql.org 2755 [ # # ]:UBC 0 : ereport(PANIC,
2756 : : (errcode(ERRCODE_DATA_CORRUPTED),
2757 : : errmsg("replication slot file \"%s\" has unsupported version %u",
2758 : : path, cp.version)));
2759 : :
2760 : : /* boundary check on length */
4141 andres@anarazel.de 2761 [ - + ]:CBC 121 : if (cp.length != ReplicationSlotOnDiskV2Size)
4426 rhaas@postgresql.org 2762 [ # # ]:UBC 0 : ereport(PANIC,
2763 : : (errcode(ERRCODE_DATA_CORRUPTED),
2764 : : errmsg("replication slot file \"%s\" has corrupted length %u",
2765 : : path, cp.length)));
2766 : :
2767 : : /* Now that we know the size, read the entire file */
3284 rhaas@postgresql.org 2768 :CBC 121 : pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_READ);
4426 2769 : 242 : readBytes = read(fd,
2770 : : (char *) &cp + ReplicationSlotOnDiskConstantSize,
2771 : 121 : cp.length);
3284 2772 : 121 : pgstat_report_wait_end();
4426 2773 [ - + ]: 121 : if (readBytes != cp.length)
2774 : : {
2797 michael@paquier.xyz 2775 [ # # ]:UBC 0 : if (readBytes < 0)
2776 [ # # ]: 0 : ereport(PANIC,
2777 : : (errcode_for_file_access(),
2778 : : errmsg("could not read file \"%s\": %m", path)));
2779 : : else
2780 [ # # ]: 0 : ereport(PANIC,
2781 : : (errcode(ERRCODE_DATA_CORRUPTED),
2782 : : errmsg("could not read file \"%s\": read %d of %zu",
2783 : : path, readBytes, (Size) cp.length)));
2784 : : }
2785 : :
2444 peter@eisentraut.org 2786 [ - + ]:CBC 121 : if (CloseTransientFile(fd) != 0)
2563 michael@paquier.xyz 2787 [ # # ]:UBC 0 : ereport(PANIC,
2788 : : (errcode_for_file_access(),
2789 : : errmsg("could not close file \"%s\": %m", path)));
2790 : :
2791 : : /* now verify the CRC */
4149 heikki.linnakangas@i 2792 :CBC 121 : INIT_CRC32C(checksum);
2793 : 121 : COMP_CRC32C(checksum,
2794 : : (char *) &cp + ReplicationSlotOnDiskNotChecksummedSize,
2795 : : ReplicationSlotOnDiskChecksummedSize);
4141 andres@anarazel.de 2796 : 121 : FIN_CRC32C(checksum);
2797 : :
4149 heikki.linnakangas@i 2798 [ - + ]: 121 : if (!EQ_CRC32C(checksum, cp.checksum))
4426 rhaas@postgresql.org 2799 [ # # ]:UBC 0 : ereport(PANIC,
2800 : : (errmsg("checksum mismatch for replication slot file \"%s\": is %u, should be %u",
2801 : : path, checksum, cp.checksum)));
2802 : :
2803 : : /*
2804 : : * If we crashed with an ephemeral slot active, don't restore but delete
2805 : : * it.
2806 : : */
4252 andres@anarazel.de 2807 [ - + ]:CBC 121 : if (cp.slotdata.persistency != RS_PERSISTENT)
2808 : : {
2751 michael@paquier.xyz 2809 [ # # ]:UBC 0 : if (!rmtree(slotdir, true))
2810 : : {
4252 andres@anarazel.de 2811 [ # # ]: 0 : ereport(WARNING,
2812 : : (errmsg("could not remove directory \"%s\"",
2813 : : slotdir)));
2814 : : }
562 michael@paquier.xyz 2815 : 0 : fsync_fname(PG_REPLSLOT_DIR, true);
4252 andres@anarazel.de 2816 : 0 : return;
2817 : : }
2818 : :
2819 : : /*
2820 : : * Verify that requirements for the specific slot type are met. That's
2821 : : * important because if these aren't met we're not guaranteed to retain
2822 : : * all the necessary resources for the slot.
2823 : : *
2824 : : * NB: We have to do so *after* the above checks for ephemeral slots,
2825 : : * because otherwise a slot that shouldn't exist anymore could prevent
2826 : : * restarts.
2827 : : *
2828 : : * NB: Changing the requirements here also requires adapting
2829 : : * CheckSlotRequirements() and CheckLogicalDecodingRequirements().
2830 : : */
384 msawada@postgresql.o 2831 [ + + ]:CBC 121 : if (cp.slotdata.database != InvalidOid)
2832 : : {
82 msawada@postgresql.o 2833 [ + + ]:GNC 83 : if (wal_level < WAL_LEVEL_REPLICA)
384 msawada@postgresql.o 2834 [ + - ]:GBC 1 : ereport(FATAL,
2835 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2836 : : errmsg("logical replication slot \"%s\" exists, but \"wal_level\" < \"replica\"",
2837 : : NameStr(cp.slotdata.name)),
2838 : : errhint("Change \"wal_level\" to be \"replica\" or higher.")));
2839 : :
2840 : : /*
2841 : : * In standby mode, the hot standby must be enabled. This check is
2842 : : * necessary to ensure logical slots are invalidated when they become
2843 : : * incompatible due to insufficient wal_level. Otherwise, if the
2844 : : * primary reduces effective_wal_level < logical while hot standby is
2845 : : * disabled, primary disable logical decoding while hot standby is
2846 : : * disabled, logical slots would remain valid even after promotion.
2847 : : */
384 msawada@postgresql.o 2848 [ + + + + ]:CBC 82 : if (StandbyMode && !EnableHotStandby)
2849 [ + - ]: 1 : ereport(FATAL,
2850 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2851 : : errmsg("logical replication slot \"%s\" exists on the standby, but \"hot_standby\" = \"off\"",
2852 : : NameStr(cp.slotdata.name)),
2853 : : errhint("Change \"hot_standby\" to be \"on\".")));
2854 : : }
2692 andres@anarazel.de 2855 [ - + ]: 38 : else if (wal_level < WAL_LEVEL_REPLICA)
2692 andres@anarazel.de 2856 [ # # ]:UBC 0 : ereport(FATAL,
2857 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2858 : : errmsg("physical replication slot \"%s\" exists, but \"wal_level\" < \"replica\"",
2859 : : NameStr(cp.slotdata.name)),
2860 : : errhint("Change \"wal_level\" to be \"replica\" or higher.")));
2861 : :
2862 : : /* nothing can be active yet, don't lock anything */
4426 rhaas@postgresql.org 2863 [ + - ]:CBC 176 : for (i = 0; i < max_replication_slots; i++)
2864 : : {
2865 : : ReplicationSlot *slot;
2866 : :
2867 : 176 : slot = &ReplicationSlotCtl->replication_slots[i];
2868 : :
2869 [ + + ]: 176 : if (slot->in_use)
2870 : 57 : continue;
2871 : :
2872 : : /* restore the entire set of persistent data */
2873 : 119 : memcpy(&slot->data, &cp.slotdata,
2874 : : sizeof(ReplicationSlotPersistentData));
2875 : :
2876 : : /* initialize in memory state */
2877 : 119 : slot->effective_xmin = cp.slotdata.xmin;
4395 2878 : 119 : slot->effective_catalog_xmin = cp.slotdata.catalog_xmin;
913 akapila@postgresql.o 2879 : 119 : slot->last_saved_confirmed_flush = cp.slotdata.confirmed_flush;
274 akorotkov@postgresql 2880 : 119 : slot->last_saved_restart_lsn = cp.slotdata.restart_lsn;
2881 : :
4395 rhaas@postgresql.org 2882 : 119 : slot->candidate_catalog_xmin = InvalidTransactionId;
2883 : 119 : slot->candidate_xmin_lsn = InvalidXLogRecPtr;
2884 : 119 : slot->candidate_restart_lsn = InvalidXLogRecPtr;
2885 : 119 : slot->candidate_restart_valid = InvalidXLogRecPtr;
2886 : :
4426 2887 : 119 : slot->in_use = true;
33 heikki.linnakangas@i 2888 :GNC 119 : slot->active_proc = INVALID_PROC_NUMBER;
2889 : :
2890 : : /*
2891 : : * Set the time since the slot has become inactive after loading the
2892 : : * slot from the disk into memory. Whoever acquires the slot i.e.
2893 : : * makes the slot active will reset it. Use the same inactive_since
2894 : : * time for all the slots.
2895 : : */
403 akapila@postgresql.o 2896 [ + - ]:CBC 119 : if (now == 0)
2897 : 119 : now = GetCurrentTimestamp();
2898 : :
2899 : 119 : ReplicationSlotSetInactiveSince(slot, now, false);
2900 : :
4426 rhaas@postgresql.org 2901 : 119 : restored = true;
2902 : 119 : break;
2903 : : }
2904 : :
2905 [ - + ]: 119 : if (!restored)
2690 michael@paquier.xyz 2906 [ # # ]:UBC 0 : ereport(FATAL,
2907 : : (errmsg("too many replication slots active before shutdown"),
2908 : : errhint("Increase \"max_replication_slots\" and try again.")));
2909 : : }
2910 : :
2911 : : /*
2912 : : * Maps an invalidation reason for a replication slot to
2913 : : * ReplicationSlotInvalidationCause.
2914 : : */
2915 : : ReplicationSlotInvalidationCause
389 akapila@postgresql.o 2916 : 0 : GetSlotInvalidationCause(const char *cause_name)
2917 : : {
2918 [ # # ]: 0 : Assert(cause_name);
2919 : :
2920 : : /* Search lookup table for the cause having this name */
2921 [ # # ]: 0 : for (int i = 0; i <= RS_INVAL_MAX_CAUSES; i++)
2922 : : {
2923 [ # # ]: 0 : if (strcmp(SlotInvalidationCauses[i].cause_name, cause_name) == 0)
2924 : 0 : return SlotInvalidationCauses[i].cause;
2925 : : }
2926 : :
2927 : 0 : Assert(false);
2928 : : return RS_INVAL_NONE; /* to keep compiler quiet */
2929 : : }
2930 : :
2931 : : /*
2932 : : * Maps a ReplicationSlotInvalidationCause to the invalidation
2933 : : * reason for a replication slot.
2934 : : */
2935 : : const char *
389 akapila@postgresql.o 2936 :CBC 44 : GetSlotInvalidationCauseName(ReplicationSlotInvalidationCause cause)
2937 : : {
2938 : : /* Search lookup table for the name of this cause */
2939 [ + - ]: 141 : for (int i = 0; i <= RS_INVAL_MAX_CAUSES; i++)
2940 : : {
2941 [ + + ]: 141 : if (SlotInvalidationCauses[i].cause == cause)
2942 : 44 : return SlotInvalidationCauses[i].cause_name;
2943 : : }
2944 : :
389 akapila@postgresql.o 2945 :UBC 0 : Assert(false);
2946 : : return "none"; /* to keep compiler quiet */
2947 : : }
2948 : :
2949 : : /*
2950 : : * A helper function to validate slots specified in GUC synchronized_standby_slots.
2951 : : *
2952 : : * The rawname will be parsed, and the result will be saved into *elemlist.
2953 : : */
2954 : : static bool
622 akapila@postgresql.o 2955 :CBC 20 : validate_sync_standby_slots(char *rawname, List **elemlist)
2956 : : {
2957 : : /* Verify syntax and parse string into a list of identifiers */
139 2958 [ - + ]: 20 : if (!SplitIdentifierString(rawname, ',', elemlist))
2959 : : {
737 akapila@postgresql.o 2960 :UBC 0 : GUC_check_errdetail("List syntax is invalid.");
139 2961 : 0 : return false;
2962 : : }
2963 : :
2964 : : /* Iterate the list to validate each slot name */
139 akapila@postgresql.o 2965 [ + - + + :CBC 56 : foreach_ptr(char, name, *elemlist)
+ + ]
2966 : : {
2967 : : int err_code;
2968 : 20 : char *err_msg = NULL;
2969 : 20 : char *err_hint = NULL;
2970 : :
139 akapila@postgresql.o 2971 [ + + ]:GNC 20 : if (!ReplicationSlotValidateNameInternal(name, false, &err_code,
2972 : : &err_msg, &err_hint))
2973 : : {
139 akapila@postgresql.o 2974 :CBC 2 : GUC_check_errcode(err_code);
2975 : 2 : GUC_check_errdetail("%s", err_msg);
2976 [ + - ]: 2 : if (err_hint != NULL)
2977 : 2 : GUC_check_errhint("%s", err_hint);
2978 : 2 : return false;
2979 : : }
2980 : : }
2981 : :
2982 : 18 : return true;
2983 : : }
2984 : :
2985 : : /*
2986 : : * GUC check_hook for synchronized_standby_slots
2987 : : */
2988 : : bool
622 2989 : 1217 : check_synchronized_standby_slots(char **newval, void **extra, GucSource source)
2990 : : {
2991 : : char *rawname;
2992 : : char *ptr;
2993 : : List *elemlist;
2994 : : int size;
2995 : : bool ok;
2996 : : SyncStandbySlotsConfigData *config;
2997 : :
737 2998 [ + + ]: 1217 : if ((*newval)[0] == '\0')
2999 : 1197 : return true;
3000 : :
3001 : : /* Need a modifiable copy of the GUC string */
3002 : 20 : rawname = pstrdup(*newval);
3003 : :
3004 : : /* Now verify if the specified slots exist and have correct type */
622 3005 : 20 : ok = validate_sync_standby_slots(rawname, &elemlist);
3006 : :
737 3007 [ + + - + ]: 20 : if (!ok || elemlist == NIL)
3008 : : {
3009 : 2 : pfree(rawname);
3010 : 2 : list_free(elemlist);
3011 : 2 : return ok;
3012 : : }
3013 : :
3014 : : /* Compute the size required for the SyncStandbySlotsConfigData struct */
622 3015 : 18 : size = offsetof(SyncStandbySlotsConfigData, slot_names);
737 3016 [ + - + + : 54 : foreach_ptr(char, slot_name, elemlist)
+ + ]
3017 : 18 : size += strlen(slot_name) + 1;
3018 : :
3019 : : /* GUC extra value must be guc_malloc'd, not palloc'd */
622 3020 : 18 : config = (SyncStandbySlotsConfigData *) guc_malloc(LOG, size);
353 dgustafsson@postgres 3021 [ - + ]: 18 : if (!config)
353 dgustafsson@postgres 3022 :UBC 0 : return false;
3023 : :
3024 : : /* Transform the data into SyncStandbySlotsConfigData */
737 akapila@postgresql.o 3025 :CBC 18 : config->nslotnames = list_length(elemlist);
3026 : :
3027 : 18 : ptr = config->slot_names;
3028 [ + - + + : 54 : foreach_ptr(char, slot_name, elemlist)
+ + ]
3029 : : {
3030 : 18 : strcpy(ptr, slot_name);
3031 : 18 : ptr += strlen(slot_name) + 1;
3032 : : }
3033 : :
472 peter@eisentraut.org 3034 : 18 : *extra = config;
3035 : :
737 akapila@postgresql.o 3036 : 18 : pfree(rawname);
3037 : 18 : list_free(elemlist);
3038 : 18 : return true;
3039 : : }
3040 : :
3041 : : /*
3042 : : * GUC assign_hook for synchronized_standby_slots
3043 : : */
3044 : : void
622 3045 : 1214 : assign_synchronized_standby_slots(const char *newval, void *extra)
3046 : : {
3047 : : /*
3048 : : * The standby slots may have changed, so we must recompute the oldest
3049 : : * LSN.
3050 : : */
737 3051 : 1214 : ss_oldest_flush_lsn = InvalidXLogRecPtr;
3052 : :
622 3053 : 1214 : synchronized_standby_slots_config = (SyncStandbySlotsConfigData *) extra;
737 3054 : 1214 : }
3055 : :
3056 : : /*
3057 : : * Check if the passed slot_name is specified in the synchronized_standby_slots GUC.
3058 : : */
3059 : : bool
622 3060 : 37648 : SlotExistsInSyncStandbySlots(const char *slot_name)
3061 : : {
3062 : : const char *standby_slot_name;
3063 : :
3064 : : /* Return false if there is no value in synchronized_standby_slots */
3065 [ + + ]: 37648 : if (synchronized_standby_slots_config == NULL)
737 3066 : 37633 : return false;
3067 : :
3068 : : /*
3069 : : * XXX: We are not expecting this list to be long so a linear search
3070 : : * shouldn't hurt but if that turns out not to be true then we can cache
3071 : : * this information for each WalSender as well.
3072 : : */
622 3073 : 15 : standby_slot_name = synchronized_standby_slots_config->slot_names;
3074 [ + + ]: 23 : for (int i = 0; i < synchronized_standby_slots_config->nslotnames; i++)
3075 : : {
737 3076 [ + + ]: 15 : if (strcmp(standby_slot_name, slot_name) == 0)
3077 : 7 : return true;
3078 : :
737 akapila@postgresql.o 3079 :GBC 8 : standby_slot_name += strlen(standby_slot_name) + 1;
3080 : : }
3081 : :
3082 : 8 : return false;
3083 : : }
3084 : :
3085 : : /*
3086 : : * Return true if the slots specified in synchronized_standby_slots have caught up to
3087 : : * the given WAL location, false otherwise.
3088 : : *
3089 : : * The elevel parameter specifies the error level used for logging messages
3090 : : * related to slots that do not exist, are invalidated, or are inactive.
3091 : : */
3092 : : bool
737 akapila@postgresql.o 3093 :CBC 685 : StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
3094 : : {
3095 : : const char *name;
3096 : 685 : int caught_up_slot_num = 0;
3097 : 685 : XLogRecPtr min_restart_lsn = InvalidXLogRecPtr;
3098 : :
3099 : : /*
3100 : : * Don't need to wait for the standbys to catch up if there is no value in
3101 : : * synchronized_standby_slots.
3102 : : */
622 3103 [ + + ]: 685 : if (synchronized_standby_slots_config == NULL)
737 3104 : 663 : return true;
3105 : :
3106 : : /*
3107 : : * Don't need to wait for the standbys to catch up if we are on a standby
3108 : : * server, since we do not support syncing slots to cascading standbys.
3109 : : */
3110 [ - + ]: 22 : if (RecoveryInProgress())
737 akapila@postgresql.o 3111 :UBC 0 : return true;
3112 : :
3113 : : /*
3114 : : * Don't need to wait for the standbys to catch up if they are already
3115 : : * beyond the specified WAL location.
3116 : : */
129 alvherre@kurilemu.de 3117 [ + + ]:GNC 22 : if (XLogRecPtrIsValid(ss_oldest_flush_lsn) &&
737 akapila@postgresql.o 3118 [ + + ]:CBC 12 : ss_oldest_flush_lsn >= wait_for_lsn)
3119 : 6 : return true;
3120 : :
3121 : : /*
3122 : : * To prevent concurrent slot dropping and creation while filtering the
3123 : : * slots, take the ReplicationSlotControlLock outside of the loop.
3124 : : */
3125 : 16 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
3126 : :
622 3127 : 16 : name = synchronized_standby_slots_config->slot_names;
3128 [ + + ]: 23 : for (int i = 0; i < synchronized_standby_slots_config->nslotnames; i++)
3129 : : {
3130 : : XLogRecPtr restart_lsn;
3131 : : bool invalidated;
3132 : : bool inactive;
3133 : : ReplicationSlot *slot;
3134 : :
737 3135 : 16 : slot = SearchNamedReplicationSlot(name, false);
3136 : :
3137 : : /*
3138 : : * If a slot name provided in synchronized_standby_slots does not
3139 : : * exist, report a message and exit the loop.
3140 : : */
3141 [ - + ]: 16 : if (!slot)
3142 : : {
737 akapila@postgresql.o 3143 [ # # ]:UBC 0 : ereport(elevel,
3144 : : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
3145 : : errmsg("replication slot \"%s\" specified in parameter \"%s\" does not exist",
3146 : : name, "synchronized_standby_slots"),
3147 : : errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
3148 : : name),
3149 : : errhint("Create the replication slot \"%s\" or amend parameter \"%s\".",
3150 : : name, "synchronized_standby_slots"));
3151 : 0 : break;
3152 : : }
3153 : :
3154 : : /* Same as above: if a slot is not physical, exit the loop. */
737 akapila@postgresql.o 3155 [ - + ]:CBC 16 : if (SlotIsLogical(slot))
3156 : : {
737 akapila@postgresql.o 3157 [ # # ]:UBC 0 : ereport(elevel,
3158 : : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
3159 : : errmsg("cannot specify logical replication slot \"%s\" in parameter \"%s\"",
3160 : : name, "synchronized_standby_slots"),
3161 : : errdetail("Logical replication is waiting for correction on replication slot \"%s\".",
3162 : : name),
3163 : : errhint("Remove the logical replication slot \"%s\" from parameter \"%s\".",
3164 : : name, "synchronized_standby_slots"));
3165 : 0 : break;
3166 : : }
3167 : :
737 akapila@postgresql.o 3168 [ - + ]:CBC 16 : SpinLockAcquire(&slot->mutex);
3169 : 16 : restart_lsn = slot->data.restart_lsn;
3170 : 16 : invalidated = slot->data.invalidated != RS_INVAL_NONE;
33 heikki.linnakangas@i 3171 :GNC 16 : inactive = slot->active_proc == INVALID_PROC_NUMBER;
737 akapila@postgresql.o 3172 :CBC 16 : SpinLockRelease(&slot->mutex);
3173 : :
3174 [ - + ]: 16 : if (invalidated)
3175 : : {
3176 : : /* Specified physical slot has been invalidated */
737 akapila@postgresql.o 3177 [ # # ]:UBC 0 : ereport(elevel,
3178 : : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
3179 : : errmsg("physical replication slot \"%s\" specified in parameter \"%s\" has been invalidated",
3180 : : name, "synchronized_standby_slots"),
3181 : : errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
3182 : : name),
3183 : : errhint("Drop and recreate the replication slot \"%s\", or amend parameter \"%s\".",
3184 : : name, "synchronized_standby_slots"));
3185 : 0 : break;
3186 : : }
3187 : :
129 alvherre@kurilemu.de 3188 [ + + + + ]:GNC 16 : if (!XLogRecPtrIsValid(restart_lsn) || restart_lsn < wait_for_lsn)
3189 : : {
3190 : : /* Log a message if no active_pid for this physical slot */
737 akapila@postgresql.o 3191 [ + + ]:CBC 9 : if (inactive)
3192 [ + - ]: 6 : ereport(elevel,
3193 : : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
3194 : : errmsg("replication slot \"%s\" specified in parameter \"%s\" does not have active_pid",
3195 : : name, "synchronized_standby_slots"),
3196 : : errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
3197 : : name),
3198 : : errhint("Start the standby associated with the replication slot \"%s\", or amend parameter \"%s\".",
3199 : : name, "synchronized_standby_slots"));
3200 : :
3201 : : /* Continue if the current slot hasn't caught up. */
3202 : 9 : break;
3203 : : }
3204 : :
3205 [ - + ]: 7 : Assert(restart_lsn >= wait_for_lsn);
3206 : :
129 alvherre@kurilemu.de 3207 [ - + - - ]:GNC 7 : if (!XLogRecPtrIsValid(min_restart_lsn) ||
3208 : : min_restart_lsn > restart_lsn)
737 akapila@postgresql.o 3209 :CBC 7 : min_restart_lsn = restart_lsn;
3210 : :
3211 : 7 : caught_up_slot_num++;
3212 : :
3213 : 7 : name += strlen(name) + 1;
3214 : : }
3215 : :
3216 : 16 : LWLockRelease(ReplicationSlotControlLock);
3217 : :
3218 : : /*
3219 : : * Return false if not all the standbys have caught up to the specified
3220 : : * WAL location.
3221 : : */
622 3222 [ + + ]: 16 : if (caught_up_slot_num != synchronized_standby_slots_config->nslotnames)
737 3223 : 9 : return false;
3224 : :
3225 : : /* The ss_oldest_flush_lsn must not retreat. */
129 alvherre@kurilemu.de 3226 [ + + - + ]:GNC 7 : Assert(!XLogRecPtrIsValid(ss_oldest_flush_lsn) ||
3227 : : min_restart_lsn >= ss_oldest_flush_lsn);
3228 : :
737 akapila@postgresql.o 3229 :CBC 7 : ss_oldest_flush_lsn = min_restart_lsn;
3230 : :
3231 : 7 : return true;
3232 : : }
3233 : :
3234 : : /*
3235 : : * Wait for physical standbys to confirm receiving the given lsn.
3236 : : *
3237 : : * Used by logical decoding SQL functions. It waits for physical standbys
3238 : : * corresponding to the physical slots specified in the synchronized_standby_slots GUC.
3239 : : */
3240 : : void
3241 : 231 : WaitForStandbyConfirmation(XLogRecPtr wait_for_lsn)
3242 : : {
3243 : : /*
3244 : : * Don't need to wait for the standby to catch up if the current acquired
3245 : : * slot is not a logical failover slot, or there is no value in
3246 : : * synchronized_standby_slots.
3247 : : */
622 3248 [ + + + + ]: 231 : if (!MyReplicationSlot->data.failover || !synchronized_standby_slots_config)
737 3249 : 230 : return;
3250 : :
3251 : 1 : ConditionVariablePrepareToSleep(&WalSndCtl->wal_confirm_rcv_cv);
3252 : :
3253 : : for (;;)
3254 : : {
3255 [ - + ]: 2 : CHECK_FOR_INTERRUPTS();
3256 : :
3257 [ + + ]: 2 : if (ConfigReloadPending)
3258 : : {
3259 : 1 : ConfigReloadPending = false;
3260 : 1 : ProcessConfigFile(PGC_SIGHUP);
3261 : : }
3262 : :
3263 : : /* Exit if done waiting for every slot. */
3264 [ + + ]: 2 : if (StandbySlotsHaveCaughtup(wait_for_lsn, WARNING))
3265 : 1 : break;
3266 : :
3267 : : /*
3268 : : * Wait for the slots in the synchronized_standby_slots to catch up,
3269 : : * but use a timeout (1s) so we can also check if the
3270 : : * synchronized_standby_slots has been changed.
3271 : : */
3272 : 1 : ConditionVariableTimedSleep(&WalSndCtl->wal_confirm_rcv_cv, 1000,
3273 : : WAIT_EVENT_WAIT_FOR_STANDBY_CONFIRMATION);
3274 : : }
3275 : :
3276 : 1 : ConditionVariableCancelSleep();
3277 : : }
|