Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * slot.c
4 : : * Replication slot management.
5 : : *
6 : : *
7 : : * Copyright (c) 2012-2025, PostgreSQL Global Development Group
8 : : *
9 : : *
10 : : * IDENTIFICATION
11 : : * src/backend/replication/slot.c
12 : : *
13 : : * NOTES
14 : : *
15 : : * Replication slots are used to keep state about replication streams
16 : : * originating from this cluster. Their primary purpose is to prevent the
17 : : * premature removal of WAL or of old tuple versions in a manner that would
18 : : * interfere with replication; they are also useful for monitoring purposes.
19 : : * Slots need to be permanent (to allow restarts), crash-safe, and allocatable
20 : : * on standbys (to support cascading setups). The requirement that slots be
21 : : * usable on standbys precludes storing them in the system catalogs.
22 : : *
23 : : * Each replication slot gets its own directory inside the directory
24 : : * $PGDATA / PG_REPLSLOT_DIR. Inside that directory the state file will
25 : : * contain the slot's own data. Additional data can be stored alongside that
26 : : * file if required. While the server is running, the state data is also
27 : : * cached in memory for efficiency.
28 : : *
29 : : * ReplicationSlotAllocationLock must be taken in exclusive mode to allocate
30 : : * or free a slot. ReplicationSlotControlLock must be taken in shared mode
31 : : * to iterate over the slots, and in exclusive mode to change the in_use flag
32 : : * of a slot. The remaining data in each slot is protected by its mutex.
33 : : *
34 : : *-------------------------------------------------------------------------
35 : : */
36 : :
37 : : #include "postgres.h"
38 : :
39 : : #include <unistd.h>
40 : : #include <sys/stat.h>
41 : :
42 : : #include "access/transam.h"
43 : : #include "access/xlog_internal.h"
44 : : #include "access/xlogrecovery.h"
45 : : #include "common/file_utils.h"
46 : : #include "common/string.h"
47 : : #include "miscadmin.h"
48 : : #include "pgstat.h"
49 : : #include "postmaster/interrupt.h"
50 : : #include "replication/logicallauncher.h"
51 : : #include "replication/slotsync.h"
52 : : #include "replication/slot.h"
53 : : #include "replication/walsender_private.h"
54 : : #include "storage/fd.h"
55 : : #include "storage/ipc.h"
56 : : #include "storage/proc.h"
57 : : #include "storage/procarray.h"
58 : : #include "utils/builtins.h"
59 : : #include "utils/guc_hooks.h"
60 : : #include "utils/injection_point.h"
61 : : #include "utils/varlena.h"
62 : :
63 : : /*
64 : : * Replication slot on-disk data structure.
65 : : */
66 : : typedef struct ReplicationSlotOnDisk
67 : : {
68 : : /* first part of this struct needs to be version independent */
69 : :
70 : : /* data not covered by checksum */
71 : : uint32 magic;
72 : : pg_crc32c checksum;
73 : :
74 : : /* data covered by checksum */
75 : : uint32 version;
76 : : uint32 length;
77 : :
78 : : /*
79 : : * The actual data in the slot that follows can differ based on the above
80 : : * 'version'.
81 : : */
82 : :
83 : : ReplicationSlotPersistentData slotdata;
84 : : } ReplicationSlotOnDisk;
85 : :
86 : : /*
87 : : * Struct for the configuration of synchronized_standby_slots.
88 : : *
89 : : * Note: this must be a flat representation that can be held in a single chunk
90 : : * of guc_malloc'd memory, so that it can be stored as the "extra" data for the
91 : : * synchronized_standby_slots GUC.
92 : : */
93 : : typedef struct
94 : : {
95 : : /* Number of slot names in the slot_names[] */
96 : : int nslotnames;
97 : :
98 : : /*
99 : : * slot_names contains 'nslotnames' consecutive null-terminated C strings.
100 : : */
101 : : char slot_names[FLEXIBLE_ARRAY_MEMBER];
102 : : } SyncStandbySlotsConfigData;
103 : :
104 : : /*
105 : : * Lookup table for slot invalidation causes.
106 : : */
107 : : typedef struct SlotInvalidationCauseMap
108 : : {
109 : : ReplicationSlotInvalidationCause cause;
110 : : const char *cause_name;
111 : : } SlotInvalidationCauseMap;
112 : :
113 : : static const SlotInvalidationCauseMap SlotInvalidationCauses[] = {
114 : : {RS_INVAL_NONE, "none"},
115 : : {RS_INVAL_WAL_REMOVED, "wal_removed"},
116 : : {RS_INVAL_HORIZON, "rows_removed"},
117 : : {RS_INVAL_WAL_LEVEL, "wal_level_insufficient"},
118 : : {RS_INVAL_IDLE_TIMEOUT, "idle_timeout"},
119 : : };
120 : :
121 : : /*
122 : : * Ensure that the lookup table is up-to-date with the enums defined in
123 : : * ReplicationSlotInvalidationCause.
124 : : */
125 : : StaticAssertDecl(lengthof(SlotInvalidationCauses) == (RS_INVAL_MAX_CAUSES + 1),
126 : : "array length mismatch");
127 : :
128 : : /* size of version independent data */
129 : : #define ReplicationSlotOnDiskConstantSize \
130 : : offsetof(ReplicationSlotOnDisk, slotdata)
131 : : /* size of the part of the slot not covered by the checksum */
132 : : #define ReplicationSlotOnDiskNotChecksummedSize \
133 : : offsetof(ReplicationSlotOnDisk, version)
134 : : /* size of the part covered by the checksum */
135 : : #define ReplicationSlotOnDiskChecksummedSize \
136 : : sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskNotChecksummedSize
137 : : /* size of the slot data that is version dependent */
138 : : #define ReplicationSlotOnDiskV2Size \
139 : : sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize
140 : :
141 : : #define SLOT_MAGIC 0x1051CA1 /* format identifier */
142 : : #define SLOT_VERSION 5 /* version for new files */
143 : :
144 : : /* Control array for replication slot management */
145 : : ReplicationSlotCtlData *ReplicationSlotCtl = NULL;
146 : :
147 : : /* My backend's replication slot in the shared memory array */
148 : : ReplicationSlot *MyReplicationSlot = NULL;
149 : :
150 : : /* GUC variables */
151 : : int max_replication_slots = 10; /* the maximum number of replication
152 : : * slots */
153 : :
154 : : /*
155 : : * Invalidate replication slots that have remained idle longer than this
156 : : * duration; '0' disables it.
157 : : */
158 : : int idle_replication_slot_timeout_secs = 0;
159 : :
160 : : /*
161 : : * This GUC lists streaming replication standby server slot names that
162 : : * logical WAL sender processes will wait for.
163 : : */
164 : : char *synchronized_standby_slots;
165 : :
166 : : /* This is the parsed and cached configuration for synchronized_standby_slots */
167 : : static SyncStandbySlotsConfigData *synchronized_standby_slots_config;
168 : :
169 : : /*
170 : : * Oldest LSN that has been confirmed to be flushed to the standbys
171 : : * corresponding to the physical slots specified in the synchronized_standby_slots GUC.
172 : : */
173 : : static XLogRecPtr ss_oldest_flush_lsn = InvalidXLogRecPtr;
174 : :
175 : : static void ReplicationSlotShmemExit(int code, Datum arg);
176 : : static bool IsSlotForConflictCheck(const char *name);
177 : : static void ReplicationSlotDropPtr(ReplicationSlot *slot);
178 : :
179 : : /* internal persistency functions */
180 : : static void RestoreSlotFromDisk(const char *name);
181 : : static void CreateSlotOnDisk(ReplicationSlot *slot);
182 : : static void SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel);
183 : :
184 : : /*
185 : : * Report shared-memory space needed by ReplicationSlotsShmemInit.
186 : : */
187 : : Size
4287 rhaas@postgresql.org 188 :CBC 4045 : ReplicationSlotsShmemSize(void)
189 : : {
190 : 4045 : Size size = 0;
191 : :
192 [ + + ]: 4045 : if (max_replication_slots == 0)
4287 rhaas@postgresql.org 193 :GBC 2 : return size;
194 : :
4287 rhaas@postgresql.org 195 :CBC 4043 : size = offsetof(ReplicationSlotCtlData, replication_slots);
196 : 4043 : size = add_size(size,
197 : : mul_size(max_replication_slots, sizeof(ReplicationSlot)));
198 : :
199 : 4043 : return size;
200 : : }
201 : :
202 : : /*
203 : : * Allocate and initialize shared memory for replication slots.
204 : : */
205 : : void
206 : 1049 : ReplicationSlotsShmemInit(void)
207 : : {
208 : : bool found;
209 : :
210 [ + + ]: 1049 : if (max_replication_slots == 0)
4287 rhaas@postgresql.org 211 :GBC 1 : return;
212 : :
4287 rhaas@postgresql.org 213 :CBC 1048 : ReplicationSlotCtl = (ReplicationSlotCtlData *)
214 : 1048 : ShmemInitStruct("ReplicationSlot Ctl", ReplicationSlotsShmemSize(),
215 : : &found);
216 : :
217 [ + - ]: 1048 : if (!found)
218 : : {
219 : : int i;
220 : :
221 : : /* First time through, so initialize */
222 [ + - + - : 1948 : MemSet(ReplicationSlotCtl, 0, ReplicationSlotsShmemSize());
+ - + + +
+ ]
223 : :
224 [ + + ]: 11379 : for (i = 0; i < max_replication_slots; i++)
225 : : {
226 : 10331 : ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[i];
227 : :
228 : : /* everything else is zeroed by the memset above */
229 : 10331 : SpinLockInit(&slot->mutex);
1991 tgl@sss.pgh.pa.us 230 : 10331 : LWLockInitialize(&slot->io_in_progress_lock,
231 : : LWTRANCHE_REPLICATION_SLOT_IO);
3016 alvherre@alvh.no-ip. 232 : 10331 : ConditionVariableInit(&slot->active_cv);
233 : : }
234 : : }
235 : : }
236 : :
237 : : /*
238 : : * Register the callback for replication slot cleanup and releasing.
239 : : */
240 : : void
1351 andres@anarazel.de 241 : 19082 : ReplicationSlotInitialize(void)
242 : : {
243 : 19082 : before_shmem_exit(ReplicationSlotShmemExit, 0);
244 : 19082 : }
245 : :
246 : : /*
247 : : * Release and cleanup replication slots.
248 : : */
249 : : static void
250 : 19082 : ReplicationSlotShmemExit(int code, Datum arg)
251 : : {
252 : : /* Make sure active replication slots are released */
253 [ + + ]: 19082 : if (MyReplicationSlot != NULL)
254 : 255 : ReplicationSlotRelease();
255 : :
256 : : /* Also cleanup all the temporary slots. */
550 akapila@postgresql.o 257 : 19082 : ReplicationSlotCleanup(false);
1351 andres@anarazel.de 258 : 19082 : }
259 : :
260 : : /*
261 : : * Check whether the passed slot name is valid and report errors at elevel.
262 : : *
263 : : * See comments for ReplicationSlotValidateNameInternal().
264 : : */
265 : : bool
5 fujii@postgresql.org 266 :GNC 735 : ReplicationSlotValidateName(const char *name, bool allow_reserved_name,
267 : : int elevel)
268 : : {
269 : : int err_code;
5 fujii@postgresql.org 270 :CBC 735 : char *err_msg = NULL;
271 : 735 : char *err_hint = NULL;
272 : :
5 fujii@postgresql.org 273 [ + + ]:GNC 735 : if (!ReplicationSlotValidateNameInternal(name, allow_reserved_name,
274 : : &err_code, &err_msg, &err_hint))
275 : : {
276 : : /*
277 : : * Use errmsg_internal() and errhint_internal() instead of errmsg()
278 : : * and errhint(), since the messages from
279 : : * ReplicationSlotValidateNameInternal() are already translated. This
280 : : * avoids double translation.
281 : : */
5 fujii@postgresql.org 282 [ + - + + ]:CBC 4 : ereport(elevel,
283 : : errcode(err_code),
284 : : errmsg_internal("%s", err_msg),
285 : : (err_hint != NULL) ? errhint_internal("%s", err_hint) : 0);
286 : :
5 fujii@postgresql.org 287 :UBC 0 : pfree(err_msg);
288 [ # # ]: 0 : if (err_hint != NULL)
289 : 0 : pfree(err_hint);
290 : 0 : return false;
291 : : }
292 : :
5 fujii@postgresql.org 293 :CBC 731 : return true;
294 : : }
295 : :
296 : : /*
297 : : * Check whether the passed slot name is valid.
298 : : *
299 : : * An error will be reported for a reserved replication slot name if
300 : : * allow_reserved_name is set to false.
301 : : *
302 : : * Slot names may consist out of [a-z0-9_]{1,NAMEDATALEN-1} which should allow
303 : : * the name to be used as a directory name on every supported OS.
304 : : *
305 : : * Returns true if the slot name is valid. Otherwise, returns false and stores
306 : : * the error code, error message, and optional hint in err_code, err_msg, and
307 : : * err_hint, respectively. The caller is responsible for freeing err_msg and
308 : : * err_hint, which are palloc'd.
309 : : */
310 : : bool
5 fujii@postgresql.org 311 :GNC 866 : ReplicationSlotValidateNameInternal(const char *name, bool allow_reserved_name,
312 : : int *err_code, char **err_msg, char **err_hint)
313 : : {
314 : : const char *cp;
315 : :
4287 rhaas@postgresql.org 316 [ + + ]:CBC 866 : if (strlen(name) == 0)
317 : : {
5 fujii@postgresql.org 318 : 3 : *err_code = ERRCODE_INVALID_NAME;
319 : 3 : *err_msg = psprintf(_("replication slot name \"%s\" is too short"), name);
320 : 3 : *err_hint = NULL;
4287 rhaas@postgresql.org 321 : 3 : return false;
322 : : }
323 : :
324 [ - + ]: 863 : if (strlen(name) >= NAMEDATALEN)
325 : : {
5 fujii@postgresql.org 326 :UBC 0 : *err_code = ERRCODE_NAME_TOO_LONG;
327 : 0 : *err_msg = psprintf(_("replication slot name \"%s\" is too long"), name);
328 : 0 : *err_hint = NULL;
4287 rhaas@postgresql.org 329 : 0 : return false;
330 : : }
331 : :
4287 rhaas@postgresql.org 332 [ + + ]:CBC 16931 : for (cp = name; *cp; cp++)
333 : : {
334 [ + + - + ]: 16069 : if (!((*cp >= 'a' && *cp <= 'z')
335 [ + - + + ]: 8304 : || (*cp >= '0' && *cp <= '9')
336 [ + + ]: 1619 : || (*cp == '_')))
337 : : {
5 fujii@postgresql.org 338 : 1 : *err_code = ERRCODE_INVALID_NAME;
339 : 1 : *err_msg = psprintf(_("replication slot name \"%s\" contains invalid character"), name);
340 : 1 : *err_hint = psprintf(_("Replication slot names may only contain lower case letters, numbers, and the underscore character."));
4287 rhaas@postgresql.org 341 : 1 : return false;
342 : : }
343 : : }
344 : :
96 akapila@postgresql.o 345 [ + + - + ]:GNC 862 : if (!allow_reserved_name && IsSlotForConflictCheck(name))
346 : : {
5 fujii@postgresql.org 347 :UNC 0 : *err_code = ERRCODE_RESERVED_NAME;
348 : 0 : *err_msg = psprintf(_("replication slot name \"%s\" is reserved"), name);
349 : 0 : *err_hint = psprintf(_("The name \"%s\" is reserved for the conflict detection slot."),
350 : : CONFLICT_DETECTION_SLOT);
96 akapila@postgresql.o 351 : 0 : return false;
352 : : }
353 : :
4287 rhaas@postgresql.org 354 :CBC 862 : return true;
355 : : }
356 : :
357 : : /*
358 : : * Return true if the replication slot name is "pg_conflict_detection".
359 : : */
360 : : static bool
96 akapila@postgresql.o 361 :GNC 1982 : IsSlotForConflictCheck(const char *name)
362 : : {
363 : 1982 : return (strcmp(name, CONFLICT_DETECTION_SLOT) == 0);
364 : : }
365 : :
366 : : /*
367 : : * Create a new replication slot and mark it as used by this backend.
368 : : *
369 : : * name: Name of the slot
370 : : * db_specific: logical decoding is db specific; if the slot is going to
371 : : * be used for that pass true, otherwise false.
372 : : * two_phase: Allows decoding of prepared transactions. We allow this option
373 : : * to be enabled only at the slot creation time. If we allow this option
374 : : * to be changed during decoding then it is quite possible that we skip
375 : : * prepare first time because this option was not enabled. Now next time
376 : : * during getting changes, if the two_phase option is enabled it can skip
377 : : * prepare because by that time start decoding point has been moved. So the
378 : : * user will only get commit prepared.
379 : : * failover: If enabled, allows the slot to be synced to standbys so
380 : : * that logical replication can be resumed after failover.
381 : : * synced: True if the slot is synchronized from the primary server.
382 : : */
383 : : void
4256 rhaas@postgresql.org 384 :CBC 620 : ReplicationSlotCreate(const char *name, bool db_specific,
385 : : ReplicationSlotPersistency persistency,
386 : : bool two_phase, bool failover, bool synced)
387 : : {
4287 388 : 620 : ReplicationSlot *slot = NULL;
389 : : int i;
390 : :
391 [ - + ]: 620 : Assert(MyReplicationSlot == NULL);
392 : :
393 : : /*
394 : : * The logical launcher or pg_upgrade may create or migrate an internal
395 : : * slot, so using a reserved name is allowed in these cases.
396 : : */
96 akapila@postgresql.o 397 [ + + + + ]:GNC 620 : ReplicationSlotValidateName(name, IsBinaryUpgrade || IsLogicalLauncher(),
398 : : ERROR);
399 : :
621 akapila@postgresql.o 400 [ + + ]:CBC 619 : if (failover)
401 : : {
402 : : /*
403 : : * Do not allow users to create the failover enabled slots on the
404 : : * standby as we do not support sync to the cascading standby.
405 : : *
406 : : * However, failover enabled slots can be created during slot
407 : : * synchronization because we need to retain the same values as the
408 : : * remote slot.
409 : : */
410 [ + + - + ]: 22 : if (RecoveryInProgress() && !IsSyncingReplicationSlots())
621 akapila@postgresql.o 411 [ # # ]:UBC 0 : ereport(ERROR,
412 : : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
413 : : errmsg("cannot enable failover for a replication slot created on the standby"));
414 : :
415 : : /*
416 : : * Do not allow users to create failover enabled temporary slots,
417 : : * because temporary slots will not be synced to the standby.
418 : : *
419 : : * However, failover enabled temporary slots can be created during
420 : : * slot synchronization. See the comments atop slotsync.c for details.
421 : : */
621 akapila@postgresql.o 422 [ + + + + ]:CBC 22 : if (persistency == RS_TEMPORARY && !IsSyncingReplicationSlots())
423 [ + - ]: 1 : ereport(ERROR,
424 : : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
425 : : errmsg("cannot enable failover for a temporary replication slot"));
426 : : }
427 : :
428 : : /*
429 : : * If some other backend ran this code concurrently with us, we'd likely
430 : : * both allocate the same slot, and that would be bad. We'd also be at
431 : : * risk of missing a name collision. Also, we don't want to try to create
432 : : * a new slot while somebody's busy cleaning up an old one, because we
433 : : * might both be monkeying with the same directory.
434 : : */
4287 rhaas@postgresql.org 435 : 618 : LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
436 : :
437 : : /*
438 : : * Check for name collision, and identify an allocatable slot. We need to
439 : : * hold ReplicationSlotControlLock in shared mode for this, so that nobody
440 : : * else can change the in_use flags while we're looking at them.
441 : : */
442 : 618 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
443 [ + + ]: 6032 : for (i = 0; i < max_replication_slots; i++)
444 : : {
445 : 5417 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
446 : :
447 [ + + + + ]: 5417 : if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
448 [ + - ]: 3 : ereport(ERROR,
449 : : (errcode(ERRCODE_DUPLICATE_OBJECT),
450 : : errmsg("replication slot \"%s\" already exists", name)));
451 [ + + + + ]: 5414 : if (!s->in_use && slot == NULL)
452 : 614 : slot = s;
453 : : }
454 : 615 : LWLockRelease(ReplicationSlotControlLock);
455 : :
456 : : /* If all slots are in use, we're out of luck. */
457 [ + + ]: 615 : if (slot == NULL)
458 [ + - ]: 1 : ereport(ERROR,
459 : : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
460 : : errmsg("all replication slots are in use"),
461 : : errhint("Free one or increase \"max_replication_slots\".")));
462 : :
463 : : /*
464 : : * Since this slot is not in use, nobody should be looking at any part of
465 : : * it other than the in_use field unless they're trying to allocate it.
466 : : * And since we hold ReplicationSlotAllocationLock, nobody except us can
467 : : * be doing that. So it's safe to initialize the slot.
468 : : */
469 [ - + ]: 614 : Assert(!slot->in_use);
3842 andres@anarazel.de 470 [ - + ]: 614 : Assert(slot->active_pid == 0);
471 : :
472 : : /* first initialize persistent data */
3358 473 : 614 : memset(&slot->data, 0, sizeof(ReplicationSlotPersistentData));
1904 peter@eisentraut.org 474 : 614 : namestrcpy(&slot->data.name, name);
4287 rhaas@postgresql.org 475 [ + + ]: 614 : slot->data.database = db_specific ? MyDatabaseId : InvalidOid;
3358 andres@anarazel.de 476 : 614 : slot->data.persistency = persistency;
1699 akapila@postgresql.o 477 : 614 : slot->data.two_phase = two_phase;
1566 478 : 614 : slot->data.two_phase_at = InvalidXLogRecPtr;
641 479 : 614 : slot->data.failover = failover;
621 480 : 614 : slot->data.synced = synced;
481 : :
482 : : /* and then data only present in shared memory */
3358 andres@anarazel.de 483 : 614 : slot->just_dirtied = false;
484 : 614 : slot->dirty = false;
485 : 614 : slot->effective_xmin = InvalidTransactionId;
486 : 614 : slot->effective_catalog_xmin = InvalidTransactionId;
487 : 614 : slot->candidate_catalog_xmin = InvalidTransactionId;
488 : 614 : slot->candidate_xmin_lsn = InvalidXLogRecPtr;
489 : 614 : slot->candidate_restart_valid = InvalidXLogRecPtr;
490 : 614 : slot->candidate_restart_lsn = InvalidXLogRecPtr;
774 akapila@postgresql.o 491 : 614 : slot->last_saved_confirmed_flush = InvalidXLogRecPtr;
135 akorotkov@postgresql 492 : 614 : slot->last_saved_restart_lsn = InvalidXLogRecPtr;
579 akapila@postgresql.o 493 : 614 : slot->inactive_since = 0;
494 : :
495 : : /*
496 : : * Create the slot on disk. We haven't actually marked the slot allocated
497 : : * yet, so no special cleanup is required if this errors out.
498 : : */
4287 rhaas@postgresql.org 499 : 614 : CreateSlotOnDisk(slot);
500 : :
501 : : /*
502 : : * We need to briefly prevent any other backend from iterating over the
503 : : * slots while we flip the in_use flag. We also need to set the active
504 : : * flag while holding the ControlLock as otherwise a concurrent
505 : : * ReplicationSlotAcquire() could acquire the slot as well.
506 : : */
507 : 614 : LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
508 : :
509 : 614 : slot->in_use = true;
510 : :
511 : : /* We can now mark the slot active, and that makes it our slot. */
3674 512 [ - + ]: 614 : SpinLockAcquire(&slot->mutex);
513 [ - + ]: 614 : Assert(slot->active_pid == 0);
514 : 614 : slot->active_pid = MyProcPid;
515 : 614 : SpinLockRelease(&slot->mutex);
516 : 614 : MyReplicationSlot = slot;
517 : :
4287 518 : 614 : LWLockRelease(ReplicationSlotControlLock);
519 : :
520 : : /*
521 : : * Create statistics entry for the new logical slot. We don't collect any
522 : : * stats for physical slots, so no need to create an entry for the same.
523 : : * See ReplicationSlotDropPtr for why we need to do this before releasing
524 : : * ReplicationSlotAllocationLock.
525 : : */
1845 akapila@postgresql.o 526 [ + + ]: 614 : if (SlotIsLogical(slot))
1300 andres@anarazel.de 527 : 440 : pgstat_create_replslot(slot);
528 : :
529 : : /*
530 : : * Now that the slot has been marked as in_use and active, it's safe to
531 : : * let somebody else try to allocate a slot.
532 : : */
4287 rhaas@postgresql.org 533 : 614 : LWLockRelease(ReplicationSlotAllocationLock);
534 : :
535 : : /* Let everybody know we've modified this slot */
3016 alvherre@alvh.no-ip. 536 : 614 : ConditionVariableBroadcast(&slot->active_cv);
4287 rhaas@postgresql.org 537 : 614 : }
538 : :
539 : : /*
540 : : * Search for the named replication slot.
541 : : *
542 : : * Return the replication slot if found, otherwise NULL.
543 : : */
544 : : ReplicationSlot *
1644 akapila@postgresql.o 545 : 1822 : SearchNamedReplicationSlot(const char *name, bool need_lock)
546 : : {
547 : : int i;
548 : 1822 : ReplicationSlot *slot = NULL;
549 : :
550 [ + + ]: 1822 : if (need_lock)
551 : 512 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
552 : :
4287 rhaas@postgresql.org 553 [ + + ]: 6810 : for (i = 0; i < max_replication_slots; i++)
554 : : {
555 : 6384 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
556 : :
557 [ + + + + ]: 6384 : if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
558 : : {
559 : 1396 : slot = s;
560 : 1396 : break;
561 : : }
562 : : }
563 : :
1644 akapila@postgresql.o 564 [ + + ]: 1822 : if (need_lock)
565 : 512 : LWLockRelease(ReplicationSlotControlLock);
566 : :
1956 fujii@postgresql.org 567 : 1822 : return slot;
568 : : }
569 : :
570 : : /*
571 : : * Return the index of the replication slot in
572 : : * ReplicationSlotCtl->replication_slots.
573 : : *
574 : : * This is mainly useful to have an efficient key for storing replication slot
575 : : * stats.
576 : : */
577 : : int
1300 andres@anarazel.de 578 : 7701 : ReplicationSlotIndex(ReplicationSlot *slot)
579 : : {
580 [ + - - + ]: 7701 : Assert(slot >= ReplicationSlotCtl->replication_slots &&
581 : : slot < ReplicationSlotCtl->replication_slots + max_replication_slots);
582 : :
583 : 7701 : return slot - ReplicationSlotCtl->replication_slots;
584 : : }
585 : :
586 : : /*
587 : : * If the slot at 'index' is unused, return false. Otherwise 'name' is set to
588 : : * the slot's name and true is returned.
589 : : *
590 : : * This likely is only useful for pgstat_replslot.c during shutdown, in other
591 : : * cases there are obvious TOCTOU issues.
592 : : */
593 : : bool
1115 594 : 89 : ReplicationSlotName(int index, Name name)
595 : : {
596 : : ReplicationSlot *slot;
597 : : bool found;
598 : :
599 : 89 : slot = &ReplicationSlotCtl->replication_slots[index];
600 : :
601 : : /*
602 : : * Ensure that the slot cannot be dropped while we copy the name. Don't
603 : : * need the spinlock as the name of an existing slot cannot change.
604 : : */
605 : 89 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
606 : 89 : found = slot->in_use;
607 [ + - ]: 89 : if (slot->in_use)
608 : 89 : namestrcpy(name, NameStr(slot->data.name));
609 : 89 : LWLockRelease(ReplicationSlotControlLock);
610 : :
611 : 89 : return found;
612 : : }
613 : :
614 : : /*
615 : : * Find a previously created slot and mark it as used by this process.
616 : : *
617 : : * An error is raised if nowait is true and the slot is currently in use. If
618 : : * nowait is false, we sleep until the slot is released by the owning process.
619 : : *
620 : : * An error is raised if error_if_invalid is true and the slot is found to
621 : : * be invalid. It should always be set to true, except when we are temporarily
622 : : * acquiring the slot and don't intend to change it.
623 : : */
624 : : void
269 akapila@postgresql.o 625 : 1232 : ReplicationSlotAcquire(const char *name, bool nowait, bool error_if_invalid)
626 : : {
627 : : ReplicationSlot *s;
628 : : int active_pid;
629 : :
1095 peter@eisentraut.org 630 [ + - ]: 1232 : Assert(name != NULL);
631 : :
1956 fujii@postgresql.org 632 : 1232 : retry:
633 [ - + ]: 1232 : Assert(MyReplicationSlot == NULL);
634 : :
635 : 1232 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
636 : :
637 : : /* Check if the slot exits with the given name. */
1599 alvherre@alvh.no-ip. 638 : 1232 : s = SearchNamedReplicationSlot(name, false);
1956 fujii@postgresql.org 639 [ + + - + ]: 1232 : if (s == NULL || !s->in_use)
640 : : {
641 : 9 : LWLockRelease(ReplicationSlotControlLock);
642 : :
4287 rhaas@postgresql.org 643 [ + - ]: 9 : ereport(ERROR,
644 : : (errcode(ERRCODE_UNDEFINED_OBJECT),
645 : : errmsg("replication slot \"%s\" does not exist",
646 : : name)));
647 : : }
648 : :
649 : : /*
650 : : * Do not allow users to acquire the reserved slot. This scenario may
651 : : * occur if the launcher that owns the slot has terminated unexpectedly
652 : : * due to an error, and a backend process attempts to reuse the slot.
653 : : */
96 akapila@postgresql.o 654 [ + + - + ]:GNC 1223 : if (!IsLogicalLauncher() && IsSlotForConflictCheck(name))
96 akapila@postgresql.o 655 [ # # ]:UNC 0 : ereport(ERROR,
656 : : errcode(ERRCODE_UNDEFINED_OBJECT),
657 : : errmsg("cannot acquire replication slot \"%s\"", name),
658 : : errdetail("The slot is reserved for conflict detection and can only be acquired by logical replication launcher."));
659 : :
660 : : /*
661 : : * This is the slot we want; check if it's active under some other
662 : : * process. In single user mode, we don't need this check.
663 : : */
1956 fujii@postgresql.org 664 [ + + ]:CBC 1223 : if (IsUnderPostmaster)
665 : : {
666 : : /*
667 : : * Get ready to sleep on the slot in case it is active. (We may end
668 : : * up not sleeping, but we don't want to do this while holding the
669 : : * spinlock.)
670 : : */
1599 alvherre@alvh.no-ip. 671 [ + + ]: 1218 : if (!nowait)
1956 fujii@postgresql.org 672 : 267 : ConditionVariablePrepareToSleep(&s->active_cv);
673 : :
674 : : /*
675 : : * It is important to reset the inactive_since under spinlock here to
676 : : * avoid race conditions with slot invalidation. See comments related
677 : : * to inactive_since in InvalidatePossiblyObsoleteSlot.
678 : : */
679 [ - + ]: 1218 : SpinLockAcquire(&s->mutex);
680 [ + + ]: 1218 : if (s->active_pid == 0)
681 : 1082 : s->active_pid = MyProcPid;
682 : 1218 : active_pid = s->active_pid;
250 akapila@postgresql.o 683 : 1218 : ReplicationSlotSetInactiveSince(s, 0, false);
1956 fujii@postgresql.org 684 : 1218 : SpinLockRelease(&s->mutex);
685 : : }
686 : : else
687 : : {
68 michael@paquier.xyz 688 : 5 : s->active_pid = active_pid = MyProcPid;
250 akapila@postgresql.o 689 : 5 : ReplicationSlotSetInactiveSince(s, 0, true);
690 : : }
1956 fujii@postgresql.org 691 : 1223 : LWLockRelease(ReplicationSlotControlLock);
692 : :
693 : : /*
694 : : * If we found the slot but it's already active in another process, we
695 : : * wait until the owning process signals us that it's been released, or
696 : : * error out.
697 : : */
3245 peter_e@gmx.net 698 [ - + ]: 1223 : if (active_pid != MyProcPid)
699 : : {
1599 alvherre@alvh.no-ip. 700 [ # # ]:UBC 0 : if (!nowait)
701 : : {
702 : : /* Wait here until we get signaled, and then restart */
1596 703 : 0 : ConditionVariableSleep(&s->active_cv,
704 : : WAIT_EVENT_REPLICATION_SLOT_DROP);
705 : 0 : ConditionVariableCancelSleep();
706 : 0 : goto retry;
707 : : }
708 : :
709 [ # # ]: 0 : ereport(ERROR,
710 : : (errcode(ERRCODE_OBJECT_IN_USE),
711 : : errmsg("replication slot \"%s\" is active for PID %d",
712 : : NameStr(s->data.name), active_pid)));
713 : : }
1599 alvherre@alvh.no-ip. 714 [ + + ]:CBC 1223 : else if (!nowait)
1629 tgl@sss.pgh.pa.us 715 : 267 : ConditionVariableCancelSleep(); /* no sleep needed after all */
716 : :
717 : : /* We made this slot active, so it's ours now. */
1956 fujii@postgresql.org 718 : 1223 : MyReplicationSlot = s;
719 : :
720 : : /*
721 : : * We need to check for invalidation after making the slot ours to avoid
722 : : * the possible race condition with the checkpointer that can otherwise
723 : : * invalidate the slot immediately after the check.
724 : : */
242 akapila@postgresql.o 725 [ + + + + ]: 1223 : if (error_if_invalid && s->data.invalidated != RS_INVAL_NONE)
726 [ + - ]: 1 : ereport(ERROR,
727 : : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
728 : : errmsg("can no longer access replication slot \"%s\"",
729 : : NameStr(s->data.name)),
730 : : errdetail("This replication slot has been invalidated due to \"%s\".",
731 : : GetSlotInvalidationCauseName(s->data.invalidated)));
732 : :
733 : : /* Let everybody know we've modified this slot */
734 : 1222 : ConditionVariableBroadcast(&s->active_cv);
735 : :
736 : : /*
737 : : * The call to pgstat_acquire_replslot() protects against stats for a
738 : : * different slot, from before a restart or such, being present during
739 : : * pgstat_report_replslot().
740 : : */
1300 andres@anarazel.de 741 [ + + ]: 1222 : if (SlotIsLogical(s))
742 : 1028 : pgstat_acquire_replslot(s);
743 : :
744 : :
706 akapila@postgresql.o 745 [ + + ]: 1222 : if (am_walsender)
746 : : {
747 [ + - + - : 844 : ereport(log_replication_commands ? LOG : DEBUG1,
+ + ]
748 : : SlotIsLogical(s)
749 : : ? errmsg("acquired logical replication slot \"%s\"",
750 : : NameStr(s->data.name))
751 : : : errmsg("acquired physical replication slot \"%s\"",
752 : : NameStr(s->data.name)));
753 : : }
4287 rhaas@postgresql.org 754 : 1222 : }
755 : :
756 : : /*
757 : : * Release the replication slot that this backend considers to own.
758 : : *
759 : : * This or another backend can re-acquire the slot later.
760 : : * Resources this slot requires will be preserved.
761 : : */
762 : : void
763 : 1459 : ReplicationSlotRelease(void)
764 : : {
765 : 1459 : ReplicationSlot *slot = MyReplicationSlot;
706 akapila@postgresql.o 766 : 1459 : char *slotname = NULL; /* keep compiler quiet */
767 : 1459 : bool is_logical = false; /* keep compiler quiet */
581 768 : 1459 : TimestampTz now = 0;
769 : :
3842 andres@anarazel.de 770 [ + - - + ]: 1459 : Assert(slot != NULL && slot->active_pid != 0);
771 : :
706 akapila@postgresql.o 772 [ + + ]: 1459 : if (am_walsender)
773 : : {
774 : 1023 : slotname = pstrdup(NameStr(slot->data.name));
775 : 1023 : is_logical = SlotIsLogical(slot);
776 : : }
777 : :
4256 rhaas@postgresql.org 778 [ + + ]: 1459 : if (slot->data.persistency == RS_EPHEMERAL)
779 : : {
780 : : /*
781 : : * Delete the slot. There is no !PANIC case where this is allowed to
782 : : * fail, all that may happen is an incomplete cleanup of the on-disk
783 : : * data.
784 : : */
785 : 5 : ReplicationSlotDropAcquired();
786 : : }
787 : :
788 : : /*
789 : : * If slot needed to temporarily restrain both data and catalog xmin to
790 : : * create the catalog snapshot, remove that temporary constraint.
791 : : * Snapshots can only be exported while the initial snapshot is still
792 : : * acquired.
793 : : */
3109 andres@anarazel.de 794 [ + + ]: 1459 : if (!TransactionIdIsValid(slot->data.xmin) &&
795 [ + + ]: 1435 : TransactionIdIsValid(slot->effective_xmin))
796 : : {
797 [ - + ]: 196 : SpinLockAcquire(&slot->mutex);
798 : 196 : slot->effective_xmin = InvalidTransactionId;
799 : 196 : SpinLockRelease(&slot->mutex);
800 : 196 : ReplicationSlotsComputeRequiredXmin(false);
801 : : }
802 : :
803 : : /*
804 : : * Set the time since the slot has become inactive. We get the current
805 : : * time beforehand to avoid system call while holding the spinlock.
806 : : */
570 akapila@postgresql.o 807 : 1459 : now = GetCurrentTimestamp();
808 : :
3016 alvherre@alvh.no-ip. 809 [ + + ]: 1459 : if (slot->data.persistency == RS_PERSISTENT)
810 : : {
811 : : /*
812 : : * Mark persistent slot inactive. We're not freeing it, just
813 : : * disconnecting, but wake up others that may be waiting for it.
814 : : */
815 [ - + ]: 1179 : SpinLockAcquire(&slot->mutex);
816 : 1179 : slot->active_pid = 0;
264 akapila@postgresql.o 817 : 1179 : ReplicationSlotSetInactiveSince(slot, now, false);
3016 alvherre@alvh.no-ip. 818 : 1179 : SpinLockRelease(&slot->mutex);
819 : 1179 : ConditionVariableBroadcast(&slot->active_cv);
820 : : }
821 : : else
264 akapila@postgresql.o 822 : 280 : ReplicationSlotSetInactiveSince(slot, now, true);
823 : :
4256 rhaas@postgresql.org 824 : 1459 : MyReplicationSlot = NULL;
825 : :
826 : : /* might not have been set when we've been a plain slot */
1446 alvherre@alvh.no-ip. 827 : 1459 : LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
1806 828 : 1459 : MyProc->statusFlags &= ~PROC_IN_LOGICAL_DECODING;
829 : 1459 : ProcGlobal->statusFlags[MyProc->pgxactoff] = MyProc->statusFlags;
4256 rhaas@postgresql.org 830 : 1459 : LWLockRelease(ProcArrayLock);
831 : :
706 akapila@postgresql.o 832 [ + + ]: 1459 : if (am_walsender)
833 : : {
834 [ + - + - : 1023 : ereport(log_replication_commands ? LOG : DEBUG1,
+ + ]
835 : : is_logical
836 : : ? errmsg("released logical replication slot \"%s\"",
837 : : slotname)
838 : : : errmsg("released physical replication slot \"%s\"",
839 : : slotname));
840 : :
841 : 1023 : pfree(slotname);
842 : : }
4287 rhaas@postgresql.org 843 : 1459 : }
844 : :
845 : : /*
846 : : * Cleanup temporary slots created in current session.
847 : : *
848 : : * Cleanup only synced temporary slots if 'synced_only' is true, else
849 : : * cleanup all temporary slots.
850 : : */
851 : : void
550 akapila@postgresql.o 852 : 41069 : ReplicationSlotCleanup(bool synced_only)
853 : : {
854 : : int i;
855 : :
3245 peter_e@gmx.net 856 [ + - ]: 41069 : Assert(MyReplicationSlot == NULL);
857 : :
3016 alvherre@alvh.no-ip. 858 : 41069 : restart:
859 : 41208 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
3245 peter_e@gmx.net 860 [ + + ]: 448237 : for (i = 0; i < max_replication_slots; i++)
861 : : {
862 : 407168 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
863 : :
3016 alvherre@alvh.no-ip. 864 [ + + ]: 407168 : if (!s->in_use)
865 : 393901 : continue;
866 : :
867 [ + + ]: 13267 : SpinLockAcquire(&s->mutex);
550 akapila@postgresql.o 868 [ + + ]: 13267 : if ((s->active_pid == MyProcPid &&
869 [ - + - - ]: 139 : (!synced_only || s->data.synced)))
870 : : {
3016 alvherre@alvh.no-ip. 871 [ - + ]: 139 : Assert(s->data.persistency == RS_TEMPORARY);
872 : 139 : SpinLockRelease(&s->mutex);
873 : 139 : LWLockRelease(ReplicationSlotControlLock); /* avoid deadlock */
874 : :
3245 peter_e@gmx.net 875 : 139 : ReplicationSlotDropPtr(s);
876 : :
3016 alvherre@alvh.no-ip. 877 : 139 : ConditionVariableBroadcast(&s->active_cv);
878 : 139 : goto restart;
879 : : }
880 : : else
881 : 13128 : SpinLockRelease(&s->mutex);
882 : : }
883 : :
884 : 41069 : LWLockRelease(ReplicationSlotControlLock);
3245 peter_e@gmx.net 885 : 41069 : }
886 : :
887 : : /*
888 : : * Permanently drop replication slot identified by the passed in name.
889 : : */
890 : : void
3016 alvherre@alvh.no-ip. 891 : 387 : ReplicationSlotDrop(const char *name, bool nowait)
892 : : {
4256 rhaas@postgresql.org 893 [ - + ]: 387 : Assert(MyReplicationSlot == NULL);
894 : :
269 akapila@postgresql.o 895 : 387 : ReplicationSlotAcquire(name, nowait, false);
896 : :
897 : : /*
898 : : * Do not allow users to drop the slots which are currently being synced
899 : : * from the primary to the standby.
900 : : */
621 901 [ + + + - ]: 380 : if (RecoveryInProgress() && MyReplicationSlot->data.synced)
902 [ + - ]: 1 : ereport(ERROR,
903 : : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
904 : : errmsg("cannot drop replication slot \"%s\"", name),
905 : : errdetail("This replication slot is being synchronized from the primary server."));
906 : :
4256 rhaas@postgresql.org 907 : 379 : ReplicationSlotDropAcquired();
908 : 379 : }
909 : :
910 : : /*
911 : : * Change the definition of the slot identified by the specified name.
912 : : */
913 : : void
460 akapila@postgresql.o 914 : 6 : ReplicationSlotAlter(const char *name, const bool *failover,
915 : : const bool *two_phase)
916 : : {
917 : 6 : bool update_slot = false;
918 : :
637 919 [ - + ]: 6 : Assert(MyReplicationSlot == NULL);
460 920 [ + + - + ]: 6 : Assert(failover || two_phase);
921 : :
269 922 : 6 : ReplicationSlotAcquire(name, false, true);
923 : :
637 924 [ - + ]: 6 : if (SlotIsPhysical(MyReplicationSlot))
637 akapila@postgresql.o 925 [ # # ]:UBC 0 : ereport(ERROR,
926 : : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
927 : : errmsg("cannot use %s with a physical replication slot",
928 : : "ALTER_REPLICATION_SLOT"));
929 : :
621 akapila@postgresql.o 930 [ + + ]:CBC 6 : if (RecoveryInProgress())
931 : : {
932 : : /*
933 : : * Do not allow users to alter the slots which are currently being
934 : : * synced from the primary to the standby.
935 : : */
936 [ + - ]: 1 : if (MyReplicationSlot->data.synced)
937 [ + - ]: 1 : ereport(ERROR,
938 : : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
939 : : errmsg("cannot alter replication slot \"%s\"", name),
940 : : errdetail("This replication slot is being synchronized from the primary server."));
941 : :
942 : : /*
943 : : * Do not allow users to enable failover on the standby as we do not
944 : : * support sync to the cascading standby.
945 : : */
460 akapila@postgresql.o 946 [ # # # # ]:UBC 0 : if (failover && *failover)
621 947 [ # # ]: 0 : ereport(ERROR,
948 : : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
949 : : errmsg("cannot enable failover for a replication slot"
950 : : " on the standby"));
951 : : }
952 : :
460 akapila@postgresql.o 953 [ + + ]:CBC 5 : if (failover)
954 : : {
955 : : /*
956 : : * Do not allow users to enable failover for temporary slots as we do
957 : : * not support syncing temporary slots to the standby.
958 : : */
959 [ + + - + ]: 4 : if (*failover && MyReplicationSlot->data.persistency == RS_TEMPORARY)
460 akapila@postgresql.o 960 [ # # ]:UBC 0 : ereport(ERROR,
961 : : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
962 : : errmsg("cannot enable failover for a temporary replication slot"));
963 : :
460 akapila@postgresql.o 964 [ + - ]:CBC 4 : if (MyReplicationSlot->data.failover != *failover)
965 : : {
966 [ - + ]: 4 : SpinLockAcquire(&MyReplicationSlot->mutex);
967 : 4 : MyReplicationSlot->data.failover = *failover;
968 : 4 : SpinLockRelease(&MyReplicationSlot->mutex);
969 : :
970 : 4 : update_slot = true;
971 : : }
972 : : }
973 : :
974 [ + + + - ]: 5 : if (two_phase && MyReplicationSlot->data.two_phase != *two_phase)
975 : : {
628 976 [ - + ]: 1 : SpinLockAcquire(&MyReplicationSlot->mutex);
460 977 : 1 : MyReplicationSlot->data.two_phase = *two_phase;
628 978 : 1 : SpinLockRelease(&MyReplicationSlot->mutex);
979 : :
460 980 : 1 : update_slot = true;
981 : : }
982 : :
983 [ + - ]: 5 : if (update_slot)
984 : : {
628 985 : 5 : ReplicationSlotMarkDirty();
986 : 5 : ReplicationSlotSave();
987 : : }
988 : :
637 989 : 5 : ReplicationSlotRelease();
990 : 5 : }
991 : :
992 : : /*
993 : : * Permanently drop the currently acquired replication slot.
994 : : */
995 : : void
4256 rhaas@postgresql.org 996 : 390 : ReplicationSlotDropAcquired(void)
997 : : {
998 : 390 : ReplicationSlot *slot = MyReplicationSlot;
999 : :
1000 [ - + ]: 390 : Assert(MyReplicationSlot != NULL);
1001 : :
1002 : : /* slot isn't acquired anymore */
1003 : 390 : MyReplicationSlot = NULL;
1004 : :
3245 peter_e@gmx.net 1005 : 390 : ReplicationSlotDropPtr(slot);
1006 : 390 : }
1007 : :
1008 : : /*
1009 : : * Permanently drop the replication slot which will be released by the point
1010 : : * this function returns.
1011 : : */
1012 : : static void
1013 : 529 : ReplicationSlotDropPtr(ReplicationSlot *slot)
1014 : : {
1015 : : char path[MAXPGPATH];
1016 : : char tmppath[MAXPGPATH];
1017 : :
1018 : : /*
1019 : : * If some other backend ran this code concurrently with us, we might try
1020 : : * to delete a slot with a certain name while someone else was trying to
1021 : : * create a slot with the same name.
1022 : : */
4287 rhaas@postgresql.org 1023 : 529 : LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
1024 : :
1025 : : /* Generate pathnames. */
423 michael@paquier.xyz 1026 : 529 : sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(slot->data.name));
1027 : 529 : sprintf(tmppath, "%s/%s.tmp", PG_REPLSLOT_DIR, NameStr(slot->data.name));
1028 : :
1029 : : /*
1030 : : * Rename the slot directory on disk, so that we'll no longer recognize
1031 : : * this as a valid slot. Note that if this fails, we've got to mark the
1032 : : * slot inactive before bailing out. If we're dropping an ephemeral or a
1033 : : * temporary slot, we better never fail hard as the caller won't expect
1034 : : * the slot to survive and this might get called during error handling.
1035 : : */
4256 rhaas@postgresql.org 1036 [ + - ]: 529 : if (rename(path, tmppath) == 0)
1037 : : {
1038 : : /*
1039 : : * We need to fsync() the directory we just renamed and its parent to
1040 : : * make sure that our changes are on disk in a crash-safe fashion. If
1041 : : * fsync() fails, we can't be sure whether the changes are on disk or
1042 : : * not. For now, we handle that by panicking;
1043 : : * StartupReplicationSlots() will try to straighten it out after
1044 : : * restart.
1045 : : */
1046 : 529 : START_CRIT_SECTION();
1047 : 529 : fsync_fname(tmppath, true);
423 michael@paquier.xyz 1048 : 529 : fsync_fname(PG_REPLSLOT_DIR, true);
4256 rhaas@postgresql.org 1049 [ - + ]: 529 : END_CRIT_SECTION();
1050 : : }
1051 : : else
1052 : : {
3245 peter_e@gmx.net 1053 :UBC 0 : bool fail_softly = slot->data.persistency != RS_PERSISTENT;
1054 : :
4287 rhaas@postgresql.org 1055 [ # # ]: 0 : SpinLockAcquire(&slot->mutex);
3674 1056 : 0 : slot->active_pid = 0;
4287 1057 : 0 : SpinLockRelease(&slot->mutex);
1058 : :
1059 : : /* wake up anyone waiting on this slot */
3016 alvherre@alvh.no-ip. 1060 : 0 : ConditionVariableBroadcast(&slot->active_cv);
1061 : :
4256 rhaas@postgresql.org 1062 [ # # # # ]: 0 : ereport(fail_softly ? WARNING : ERROR,
1063 : : (errcode_for_file_access(),
1064 : : errmsg("could not rename file \"%s\" to \"%s\": %m",
1065 : : path, tmppath)));
1066 : : }
1067 : :
1068 : : /*
1069 : : * The slot is definitely gone. Lock out concurrent scans of the array
1070 : : * long enough to kill it. It's OK to clear the active PID here without
1071 : : * grabbing the mutex because nobody else can be scanning the array here,
1072 : : * and nobody can be attached to this slot and thus access it without
1073 : : * scanning the array.
1074 : : *
1075 : : * Also wake up processes waiting for it.
1076 : : */
4287 rhaas@postgresql.org 1077 :CBC 529 : LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
3842 andres@anarazel.de 1078 : 529 : slot->active_pid = 0;
4287 rhaas@postgresql.org 1079 : 529 : slot->in_use = false;
1080 : 529 : LWLockRelease(ReplicationSlotControlLock);
3016 alvherre@alvh.no-ip. 1081 : 529 : ConditionVariableBroadcast(&slot->active_cv);
1082 : :
1083 : : /*
1084 : : * Slot is dead and doesn't prevent resource removal anymore, recompute
1085 : : * limits.
1086 : : */
4256 rhaas@postgresql.org 1087 : 529 : ReplicationSlotsComputeRequiredXmin(false);
4287 1088 : 529 : ReplicationSlotsComputeRequiredLSN();
1089 : :
1090 : : /*
1091 : : * If removing the directory fails, the worst thing that will happen is
1092 : : * that the user won't be able to create a new slot with the same name
1093 : : * until the next server restart. We warn about it, but that's all.
1094 : : */
1095 [ - + ]: 529 : if (!rmtree(tmppath, true))
4287 rhaas@postgresql.org 1096 [ # # ]:UBC 0 : ereport(WARNING,
1097 : : (errmsg("could not remove directory \"%s\"", tmppath)));
1098 : :
1099 : : /*
1100 : : * Drop the statistics entry for the replication slot. Do this while
1101 : : * holding ReplicationSlotAllocationLock so that we don't drop a
1102 : : * statistics entry for another slot with the same name just created in
1103 : : * another session.
1104 : : */
1845 akapila@postgresql.o 1105 [ + + ]:CBC 529 : if (SlotIsLogical(slot))
1300 andres@anarazel.de 1106 : 379 : pgstat_drop_replslot(slot);
1107 : :
1108 : : /*
1109 : : * We release this at the very end, so that nobody starts trying to create
1110 : : * a slot while we're still cleaning up the detritus of the old one.
1111 : : */
4287 rhaas@postgresql.org 1112 : 529 : LWLockRelease(ReplicationSlotAllocationLock);
1113 : 529 : }
1114 : :
1115 : : /*
1116 : : * Serialize the currently acquired slot's state from memory to disk, thereby
1117 : : * guaranteeing the current state will survive a crash.
1118 : : */
1119 : : void
1120 : 1281 : ReplicationSlotSave(void)
1121 : : {
1122 : : char path[MAXPGPATH];
1123 : :
1124 [ - + ]: 1281 : Assert(MyReplicationSlot != NULL);
1125 : :
423 michael@paquier.xyz 1126 : 1281 : sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(MyReplicationSlot->data.name));
4287 rhaas@postgresql.org 1127 : 1281 : SaveSlotToPath(MyReplicationSlot, path, ERROR);
1128 : 1281 : }
1129 : :
1130 : : /*
1131 : : * Signal that it would be useful if the currently acquired slot would be
1132 : : * flushed out to disk.
1133 : : *
1134 : : * Note that the actual flush to disk can be delayed for a long time, if
1135 : : * required for correctness explicitly do a ReplicationSlotSave().
1136 : : */
1137 : : void
1138 : 40409 : ReplicationSlotMarkDirty(void)
1139 : : {
3674 1140 : 40409 : ReplicationSlot *slot = MyReplicationSlot;
1141 : :
4287 1142 [ - + ]: 40409 : Assert(MyReplicationSlot != NULL);
1143 : :
3674 1144 [ - + ]: 40409 : SpinLockAcquire(&slot->mutex);
1145 : 40409 : MyReplicationSlot->just_dirtied = true;
1146 : 40409 : MyReplicationSlot->dirty = true;
1147 : 40409 : SpinLockRelease(&slot->mutex);
4287 1148 : 40409 : }
1149 : :
1150 : : /*
1151 : : * Convert a slot that's marked as RS_EPHEMERAL or RS_TEMPORARY to a
1152 : : * RS_PERSISTENT slot, guaranteeing it will be there after an eventual crash.
1153 : : */
1154 : : void
4256 1155 : 427 : ReplicationSlotPersist(void)
1156 : : {
1157 : 427 : ReplicationSlot *slot = MyReplicationSlot;
1158 : :
1159 [ - + ]: 427 : Assert(slot != NULL);
1160 [ - + ]: 427 : Assert(slot->data.persistency != RS_PERSISTENT);
1161 : :
3674 1162 [ - + ]: 427 : SpinLockAcquire(&slot->mutex);
1163 : 427 : slot->data.persistency = RS_PERSISTENT;
1164 : 427 : SpinLockRelease(&slot->mutex);
1165 : :
4256 1166 : 427 : ReplicationSlotMarkDirty();
1167 : 427 : ReplicationSlotSave();
1168 : 427 : }
1169 : :
1170 : : /*
1171 : : * Compute the oldest xmin across all slots and store it in the ProcArray.
1172 : : *
1173 : : * If already_locked is true, ProcArrayLock has already been acquired
1174 : : * exclusively.
1175 : : */
1176 : : void
1177 : 2215 : ReplicationSlotsComputeRequiredXmin(bool already_locked)
1178 : : {
1179 : : int i;
4287 1180 : 2215 : TransactionId agg_xmin = InvalidTransactionId;
4256 1181 : 2215 : TransactionId agg_catalog_xmin = InvalidTransactionId;
1182 : :
4287 1183 [ - + ]: 2215 : Assert(ReplicationSlotCtl != NULL);
1184 : :
3109 andres@anarazel.de 1185 : 2215 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1186 : :
4287 rhaas@postgresql.org 1187 [ + + ]: 22646 : for (i = 0; i < max_replication_slots; i++)
1188 : : {
1189 : 20431 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
1190 : : TransactionId effective_xmin;
1191 : : TransactionId effective_catalog_xmin;
1192 : : bool invalidated;
1193 : :
1194 [ + + ]: 20431 : if (!s->in_use)
1195 : 18320 : continue;
1196 : :
3674 1197 [ - + ]: 2111 : SpinLockAcquire(&s->mutex);
1198 : 2111 : effective_xmin = s->effective_xmin;
1199 : 2111 : effective_catalog_xmin = s->effective_catalog_xmin;
934 andres@anarazel.de 1200 : 2111 : invalidated = s->data.invalidated != RS_INVAL_NONE;
3674 rhaas@postgresql.org 1201 : 2111 : SpinLockRelease(&s->mutex);
1202 : :
1203 : : /* invalidated slots need not apply */
1070 alvherre@alvh.no-ip. 1204 [ + + ]: 2111 : if (invalidated)
1205 : 5 : continue;
1206 : :
1207 : : /* check the data xmin */
4287 rhaas@postgresql.org 1208 [ + + + + ]: 2106 : if (TransactionIdIsValid(effective_xmin) &&
1209 [ - + ]: 3 : (!TransactionIdIsValid(agg_xmin) ||
1210 : 3 : TransactionIdPrecedes(effective_xmin, agg_xmin)))
1211 : 274 : agg_xmin = effective_xmin;
1212 : :
1213 : : /* check the catalog xmin */
4256 1214 [ + + + + ]: 2106 : if (TransactionIdIsValid(effective_catalog_xmin) &&
1215 [ + + ]: 904 : (!TransactionIdIsValid(agg_catalog_xmin) ||
1216 : 904 : TransactionIdPrecedes(effective_catalog_xmin, agg_catalog_xmin)))
1217 : 1102 : agg_catalog_xmin = effective_catalog_xmin;
1218 : : }
1219 : :
3109 andres@anarazel.de 1220 : 2215 : LWLockRelease(ReplicationSlotControlLock);
1221 : :
4256 rhaas@postgresql.org 1222 : 2215 : ProcArraySetReplicationSlotXmin(agg_xmin, agg_catalog_xmin, already_locked);
4287 1223 : 2215 : }
1224 : :
1225 : : /*
1226 : : * Compute the oldest restart LSN across all slots and inform xlog module.
1227 : : *
1228 : : * Note: while max_slot_wal_keep_size is theoretically relevant for this
1229 : : * purpose, we don't try to account for that, because this module doesn't
1230 : : * know what to compare against.
1231 : : */
1232 : : void
1233 : 41064 : ReplicationSlotsComputeRequiredLSN(void)
1234 : : {
1235 : : int i;
4192 bruce@momjian.us 1236 : 41064 : XLogRecPtr min_required = InvalidXLogRecPtr;
1237 : :
4287 rhaas@postgresql.org 1238 [ - + ]: 41064 : Assert(ReplicationSlotCtl != NULL);
1239 : :
1240 : 41064 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1241 [ + + ]: 449427 : for (i = 0; i < max_replication_slots; i++)
1242 : : {
1243 : 408363 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
1244 : : XLogRecPtr restart_lsn;
1245 : : XLogRecPtr last_saved_restart_lsn;
1246 : : bool invalidated;
1247 : : ReplicationSlotPersistency persistency;
1248 : :
1249 [ + + ]: 408363 : if (!s->in_use)
1250 : 367158 : continue;
1251 : :
3674 1252 [ - + ]: 41205 : SpinLockAcquire(&s->mutex);
135 akorotkov@postgresql 1253 : 41205 : persistency = s->data.persistency;
3674 rhaas@postgresql.org 1254 : 41205 : restart_lsn = s->data.restart_lsn;
934 andres@anarazel.de 1255 : 41205 : invalidated = s->data.invalidated != RS_INVAL_NONE;
135 akorotkov@postgresql 1256 : 41205 : last_saved_restart_lsn = s->last_saved_restart_lsn;
3674 rhaas@postgresql.org 1257 : 41205 : SpinLockRelease(&s->mutex);
1258 : :
1259 : : /* invalidated slots need not apply */
934 andres@anarazel.de 1260 [ + + ]: 41205 : if (invalidated)
1261 : 6 : continue;
1262 : :
1263 : : /*
1264 : : * For persistent slot use last_saved_restart_lsn to compute the
1265 : : * oldest LSN for removal of WAL segments. The segments between
1266 : : * last_saved_restart_lsn and restart_lsn might be needed by a
1267 : : * persistent slot in the case of database crash. Non-persistent
1268 : : * slots can't survive the database crash, so we don't care about
1269 : : * last_saved_restart_lsn for them.
1270 : : */
135 akorotkov@postgresql 1271 [ + + ]: 41199 : if (persistency == RS_PERSISTENT)
1272 : : {
1273 [ + + + + ]: 40424 : if (last_saved_restart_lsn != InvalidXLogRecPtr &&
1274 : : restart_lsn > last_saved_restart_lsn)
1275 : : {
1276 : 38378 : restart_lsn = last_saved_restart_lsn;
1277 : : }
1278 : : }
1279 : :
4287 rhaas@postgresql.org 1280 [ + + + + ]: 41199 : if (restart_lsn != InvalidXLogRecPtr &&
1281 [ + + ]: 1102 : (min_required == InvalidXLogRecPtr ||
1282 : : restart_lsn < min_required))
1283 : 40204 : min_required = restart_lsn;
1284 : : }
1285 : 41064 : LWLockRelease(ReplicationSlotControlLock);
1286 : :
1287 : 41064 : XLogSetReplicationSlotMinimumLSN(min_required);
1288 : 41064 : }
1289 : :
1290 : : /*
1291 : : * Compute the oldest WAL LSN required by *logical* decoding slots..
1292 : : *
1293 : : * Returns InvalidXLogRecPtr if logical decoding is disabled or no logical
1294 : : * slots exist.
1295 : : *
1296 : : * NB: this returns a value >= ReplicationSlotsComputeRequiredLSN(), since it
1297 : : * ignores physical replication slots.
1298 : : *
1299 : : * The results aren't required frequently, so we don't maintain a precomputed
1300 : : * value like we do for ComputeRequiredLSN() and ComputeRequiredXmin().
1301 : : */
1302 : : XLogRecPtr
4256 1303 : 3402 : ReplicationSlotsComputeLogicalRestartLSN(void)
1304 : : {
1305 : 3402 : XLogRecPtr result = InvalidXLogRecPtr;
1306 : : int i;
1307 : :
1308 [ + + ]: 3402 : if (max_replication_slots <= 0)
4256 rhaas@postgresql.org 1309 :GBC 2 : return InvalidXLogRecPtr;
1310 : :
4256 rhaas@postgresql.org 1311 :CBC 3400 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1312 : :
1313 [ + + ]: 36912 : for (i = 0; i < max_replication_slots; i++)
1314 : : {
1315 : : ReplicationSlot *s;
1316 : : XLogRecPtr restart_lsn;
1317 : : XLogRecPtr last_saved_restart_lsn;
1318 : : bool invalidated;
1319 : : ReplicationSlotPersistency persistency;
1320 : :
1321 : 33512 : s = &ReplicationSlotCtl->replication_slots[i];
1322 : :
1323 : : /* cannot change while ReplicationSlotCtlLock is held */
1324 [ + + ]: 33512 : if (!s->in_use)
1325 : 32780 : continue;
1326 : :
1327 : : /* we're only interested in logical slots */
3730 andres@anarazel.de 1328 [ + + ]: 732 : if (!SlotIsLogical(s))
4256 rhaas@postgresql.org 1329 : 524 : continue;
1330 : :
1331 : : /* read once, it's ok if it increases while we're checking */
1332 [ - + ]: 208 : SpinLockAcquire(&s->mutex);
135 akorotkov@postgresql 1333 : 208 : persistency = s->data.persistency;
4256 rhaas@postgresql.org 1334 : 208 : restart_lsn = s->data.restart_lsn;
934 andres@anarazel.de 1335 : 208 : invalidated = s->data.invalidated != RS_INVAL_NONE;
135 akorotkov@postgresql 1336 : 208 : last_saved_restart_lsn = s->last_saved_restart_lsn;
4256 rhaas@postgresql.org 1337 : 208 : SpinLockRelease(&s->mutex);
1338 : :
1339 : : /* invalidated slots need not apply */
934 andres@anarazel.de 1340 [ - + ]: 208 : if (invalidated)
934 andres@anarazel.de 1341 :UBC 0 : continue;
1342 : :
1343 : : /*
1344 : : * For persistent slot use last_saved_restart_lsn to compute the
1345 : : * oldest LSN for removal of WAL segments. The segments between
1346 : : * last_saved_restart_lsn and restart_lsn might be needed by a
1347 : : * persistent slot in the case of database crash. Non-persistent
1348 : : * slots can't survive the database crash, so we don't care about
1349 : : * last_saved_restart_lsn for them.
1350 : : */
135 akorotkov@postgresql 1351 [ + + ]:CBC 208 : if (persistency == RS_PERSISTENT)
1352 : : {
1353 [ + - - + ]: 206 : if (last_saved_restart_lsn != InvalidXLogRecPtr &&
1354 : : restart_lsn > last_saved_restart_lsn)
1355 : : {
135 akorotkov@postgresql 1356 :UBC 0 : restart_lsn = last_saved_restart_lsn;
1357 : : }
1358 : : }
1359 : :
2029 alvherre@alvh.no-ip. 1360 [ - + ]:CBC 208 : if (restart_lsn == InvalidXLogRecPtr)
2029 alvherre@alvh.no-ip. 1361 :UBC 0 : continue;
1362 : :
4256 rhaas@postgresql.org 1363 [ + + + + ]:CBC 208 : if (result == InvalidXLogRecPtr ||
1364 : : restart_lsn < result)
1365 : 166 : result = restart_lsn;
1366 : : }
1367 : :
1368 : 3400 : LWLockRelease(ReplicationSlotControlLock);
1369 : :
1370 : 3400 : return result;
1371 : : }
1372 : :
1373 : : /*
1374 : : * ReplicationSlotsCountDBSlots -- count the number of slots that refer to the
1375 : : * passed database oid.
1376 : : *
1377 : : * Returns true if there are any slots referencing the database. *nslots will
1378 : : * be set to the absolute number of slots in the database, *nactive to ones
1379 : : * currently active.
1380 : : */
1381 : : bool
1382 : 45 : ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
1383 : : {
1384 : : int i;
1385 : :
1386 : 45 : *nslots = *nactive = 0;
1387 : :
1388 [ - + ]: 45 : if (max_replication_slots <= 0)
4256 rhaas@postgresql.org 1389 :UBC 0 : return false;
1390 : :
4256 rhaas@postgresql.org 1391 :CBC 45 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1392 [ + + ]: 472 : for (i = 0; i < max_replication_slots; i++)
1393 : : {
1394 : : ReplicationSlot *s;
1395 : :
1396 : 427 : s = &ReplicationSlotCtl->replication_slots[i];
1397 : :
1398 : : /* cannot change while ReplicationSlotCtlLock is held */
1399 [ + + ]: 427 : if (!s->in_use)
1400 : 407 : continue;
1401 : :
1402 : : /* only logical slots are database specific, skip */
3730 andres@anarazel.de 1403 [ + + ]: 20 : if (!SlotIsLogical(s))
4252 bruce@momjian.us 1404 : 9 : continue;
1405 : :
1406 : : /* not our database, skip */
4256 rhaas@postgresql.org 1407 [ + + ]: 11 : if (s->data.database != dboid)
1408 : 8 : continue;
1409 : :
1410 : : /* NB: intentionally counting invalidated slots */
1411 : :
1412 : : /* count slots with spinlock held */
1413 [ - + ]: 3 : SpinLockAcquire(&s->mutex);
1414 : 3 : (*nslots)++;
3842 andres@anarazel.de 1415 [ + + ]: 3 : if (s->active_pid != 0)
4256 rhaas@postgresql.org 1416 : 1 : (*nactive)++;
1417 : 3 : SpinLockRelease(&s->mutex);
1418 : : }
1419 : 45 : LWLockRelease(ReplicationSlotControlLock);
1420 : :
1421 [ + + ]: 45 : if (*nslots > 0)
1422 : 3 : return true;
1423 : 42 : return false;
1424 : : }
1425 : :
1426 : : /*
1427 : : * ReplicationSlotsDropDBSlots -- Drop all db-specific slots relating to the
1428 : : * passed database oid. The caller should hold an exclusive lock on the
1429 : : * pg_database oid for the database to prevent creation of new slots on the db
1430 : : * or replay from existing slots.
1431 : : *
1432 : : * Another session that concurrently acquires an existing slot on the target DB
1433 : : * (most likely to drop it) may cause this function to ERROR. If that happens
1434 : : * it may have dropped some but not all slots.
1435 : : *
1436 : : * This routine isn't as efficient as it could be - but we don't drop
1437 : : * databases often, especially databases with lots of slots.
1438 : : */
1439 : : void
3135 simon@2ndQuadrant.co 1440 : 57 : ReplicationSlotsDropDBSlots(Oid dboid)
1441 : : {
1442 : : int i;
1443 : :
1444 [ + - ]: 57 : if (max_replication_slots <= 0)
3135 simon@2ndQuadrant.co 1445 :UBC 0 : return;
1446 : :
3135 simon@2ndQuadrant.co 1447 :CBC 57 : restart:
1448 : 60 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1449 [ + + ]: 603 : for (i = 0; i < max_replication_slots; i++)
1450 : : {
1451 : : ReplicationSlot *s;
1452 : : char *slotname;
1453 : : int active_pid;
1454 : :
1455 : 546 : s = &ReplicationSlotCtl->replication_slots[i];
1456 : :
1457 : : /* cannot change while ReplicationSlotCtlLock is held */
1458 [ + + ]: 546 : if (!s->in_use)
1459 : 520 : continue;
1460 : :
1461 : : /* only logical slots are database specific, skip */
1462 [ + + ]: 26 : if (!SlotIsLogical(s))
1463 : 10 : continue;
1464 : :
1465 : : /* not our database, skip */
1466 [ + + ]: 16 : if (s->data.database != dboid)
1467 : 13 : continue;
1468 : :
1469 : : /* NB: intentionally including invalidated slots */
1470 : :
1471 : : /* acquire slot, so ReplicationSlotDropAcquired can be reused */
1472 [ - + ]: 3 : SpinLockAcquire(&s->mutex);
1473 : : /* can't change while ReplicationSlotControlLock is held */
3127 andres@anarazel.de 1474 : 3 : slotname = NameStr(s->data.name);
3135 simon@2ndQuadrant.co 1475 : 3 : active_pid = s->active_pid;
1476 [ + - ]: 3 : if (active_pid == 0)
1477 : : {
1478 : 3 : MyReplicationSlot = s;
1479 : 3 : s->active_pid = MyProcPid;
1480 : : }
1481 : 3 : SpinLockRelease(&s->mutex);
1482 : :
1483 : : /*
1484 : : * Even though we hold an exclusive lock on the database object a
1485 : : * logical slot for that DB can still be active, e.g. if it's
1486 : : * concurrently being dropped by a backend connected to another DB.
1487 : : *
1488 : : * That's fairly unlikely in practice, so we'll just bail out.
1489 : : *
1490 : : * The slot sync worker holds a shared lock on the database before
1491 : : * operating on synced logical slots to avoid conflict with the drop
1492 : : * happening here. The persistent synced slots are thus safe but there
1493 : : * is a possibility that the slot sync worker has created a temporary
1494 : : * slot (which stays active even on release) and we are trying to drop
1495 : : * that here. In practice, the chances of hitting this scenario are
1496 : : * less as during slot synchronization, the temporary slot is
1497 : : * immediately converted to persistent and thus is safe due to the
1498 : : * shared lock taken on the database. So, we'll just bail out in such
1499 : : * a case.
1500 : : *
1501 : : * XXX: We can consider shutting down the slot sync worker before
1502 : : * trying to drop synced temporary slots here.
1503 : : */
1504 [ - + ]: 3 : if (active_pid)
3127 andres@anarazel.de 1505 [ # # ]:UBC 0 : ereport(ERROR,
1506 : : (errcode(ERRCODE_OBJECT_IN_USE),
1507 : : errmsg("replication slot \"%s\" is active for PID %d",
1508 : : slotname, active_pid)));
1509 : :
1510 : : /*
1511 : : * To avoid duplicating ReplicationSlotDropAcquired() and to avoid
1512 : : * holding ReplicationSlotControlLock over filesystem operations,
1513 : : * release ReplicationSlotControlLock and use
1514 : : * ReplicationSlotDropAcquired.
1515 : : *
1516 : : * As that means the set of slots could change, restart scan from the
1517 : : * beginning each time we release the lock.
1518 : : */
3135 simon@2ndQuadrant.co 1519 :CBC 3 : LWLockRelease(ReplicationSlotControlLock);
1520 : 3 : ReplicationSlotDropAcquired();
1521 : 3 : goto restart;
1522 : : }
1523 : 57 : LWLockRelease(ReplicationSlotControlLock);
1524 : : }
1525 : :
1526 : :
1527 : : /*
1528 : : * Check whether the server's configuration supports using replication
1529 : : * slots.
1530 : : */
1531 : : void
4287 rhaas@postgresql.org 1532 : 1645 : CheckSlotRequirements(void)
1533 : : {
1534 : : /*
1535 : : * NB: Adding a new requirement likely means that RestoreSlotFromDisk()
1536 : : * needs the same check.
1537 : : */
1538 : :
1539 [ - + ]: 1645 : if (max_replication_slots == 0)
4287 rhaas@postgresql.org 1540 [ # # ]:UBC 0 : ereport(ERROR,
1541 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1542 : : errmsg("replication slots can only be used if \"max_replication_slots\" > 0")));
1543 : :
3528 peter_e@gmx.net 1544 [ - + ]:CBC 1645 : if (wal_level < WAL_LEVEL_REPLICA)
4287 rhaas@postgresql.org 1545 [ # # ]:UBC 0 : ereport(ERROR,
1546 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1547 : : errmsg("replication slots can only be used if \"wal_level\" >= \"replica\"")));
4287 rhaas@postgresql.org 1548 :CBC 1645 : }
1549 : :
1550 : : /*
1551 : : * Check whether the user has privilege to use replication slots.
1552 : : */
1553 : : void
1504 michael@paquier.xyz 1554 : 524 : CheckSlotPermissions(void)
1555 : : {
956 peter@eisentraut.org 1556 [ + + ]: 524 : if (!has_rolreplication(GetUserId()))
1504 michael@paquier.xyz 1557 [ + - ]: 5 : ereport(ERROR,
1558 : : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1559 : : errmsg("permission denied to use replication slots"),
1560 : : errdetail("Only roles with the %s attribute may use replication slots.",
1561 : : "REPLICATION")));
1562 : 519 : }
1563 : :
1564 : : /*
1565 : : * Reserve WAL for the currently active slot.
1566 : : *
1567 : : * Compute and set restart_lsn in a manner that's appropriate for the type of
1568 : : * the slot and concurrency safe.
1569 : : */
1570 : : void
3730 andres@anarazel.de 1571 : 572 : ReplicationSlotReserveWal(void)
1572 : : {
1573 : 572 : ReplicationSlot *slot = MyReplicationSlot;
1574 : :
1575 [ - + ]: 572 : Assert(slot != NULL);
1576 [ - + ]: 572 : Assert(slot->data.restart_lsn == InvalidXLogRecPtr);
135 akorotkov@postgresql 1577 [ + - ]: 572 : Assert(slot->last_saved_restart_lsn == InvalidXLogRecPtr);
1578 : :
1579 : : /*
1580 : : * The replication slot mechanism is used to prevent removal of required
1581 : : * WAL. As there is no interlock between this routine and checkpoints, WAL
1582 : : * segments could concurrently be removed when a now stale return value of
1583 : : * ReplicationSlotsComputeRequiredLSN() is used. In the unlikely case that
1584 : : * this happens we'll just retry.
1585 : : */
1586 : : while (true)
3730 andres@anarazel.de 1587 :UBC 0 : {
1588 : : XLogSegNo segno;
1589 : : XLogRecPtr restart_lsn;
1590 : :
1591 : : /*
1592 : : * For logical slots log a standby snapshot and start logical decoding
1593 : : * at exactly that position. That allows the slot to start up more
1594 : : * quickly. But on a standby we cannot do WAL writes, so just use the
1595 : : * replay pointer; effectively, an attempt to create a logical slot on
1596 : : * standby will cause it to wait for an xl_running_xact record to be
1597 : : * logged independently on the primary, so that a snapshot can be
1598 : : * built using the record.
1599 : : *
1600 : : * None of this is needed (or indeed helpful) for physical slots as
1601 : : * they'll start replay at the last logged checkpoint anyway. Instead
1602 : : * return the location of the last redo LSN. While that slightly
1603 : : * increases the chance that we have to retry, it's where a base
1604 : : * backup has to start replay at.
1605 : : */
933 andres@anarazel.de 1606 [ + + ]:CBC 572 : if (SlotIsPhysical(slot))
1607 : 146 : restart_lsn = GetRedoRecPtr();
1608 [ - + ]: 426 : else if (RecoveryInProgress())
933 andres@anarazel.de 1609 :UBC 0 : restart_lsn = GetXLogReplayRecPtr(NULL);
1610 : : else
2694 michael@paquier.xyz 1611 :CBC 426 : restart_lsn = GetXLogInsertRecPtr();
1612 : :
933 andres@anarazel.de 1613 [ - + ]: 572 : SpinLockAcquire(&slot->mutex);
1614 : 572 : slot->data.restart_lsn = restart_lsn;
1615 : 572 : SpinLockRelease(&slot->mutex);
1616 : :
1617 : : /* prevent WAL removal as fast as possible */
3730 1618 : 572 : ReplicationSlotsComputeRequiredLSN();
1619 : :
1620 : : /*
1621 : : * If all required WAL is still there, great, otherwise retry. The
1622 : : * slot should prevent further removal of WAL, unless there's a
1623 : : * concurrent ReplicationSlotsComputeRequiredLSN() after we've written
1624 : : * the new restart_lsn above, so normally we should never need to loop
1625 : : * more than twice.
1626 : : */
2960 1627 : 572 : XLByteToSeg(slot->data.restart_lsn, segno, wal_segment_size);
3730 1628 [ + - ]: 572 : if (XLogGetLastRemovedSegno() < segno)
1629 : 572 : break;
1630 : : }
1631 : :
933 1632 [ + + + + ]: 572 : if (!RecoveryInProgress() && SlotIsLogical(slot))
1633 : : {
1634 : : XLogRecPtr flushptr;
1635 : :
1636 : : /* make sure we have enough information to start */
1637 : 426 : flushptr = LogStandbySnapshot();
1638 : :
1639 : : /* and make sure it's fsynced to disk */
1640 : 426 : XLogFlush(flushptr);
1641 : : }
3730 1642 : 572 : }
1643 : :
1644 : : /*
1645 : : * Report that replication slot needs to be invalidated
1646 : : */
1647 : : static void
934 1648 : 6 : ReportSlotInvalidation(ReplicationSlotInvalidationCause cause,
1649 : : bool terminating,
1650 : : int pid,
1651 : : NameData slotname,
1652 : : XLogRecPtr restart_lsn,
1653 : : XLogRecPtr oldestLSN,
1654 : : TransactionId snapshotConflictHorizon,
1655 : : long slot_idle_seconds)
1656 : : {
1657 : : StringInfoData err_detail;
1658 : : StringInfoData err_hint;
1659 : :
1660 : 6 : initStringInfo(&err_detail);
250 akapila@postgresql.o 1661 : 6 : initStringInfo(&err_hint);
1662 : :
934 andres@anarazel.de 1663 [ + - - - : 6 : switch (cause)
- - ]
1664 : : {
1665 : 6 : case RS_INVAL_WAL_REMOVED:
1666 : : {
212 peter@eisentraut.org 1667 : 6 : uint64 ex = oldestLSN - restart_lsn;
1668 : :
795 1669 : 6 : appendStringInfo(&err_detail,
112 alvherre@kurilemu.de 1670 :GNC 6 : ngettext("The slot's restart_lsn %X/%08X exceeds the limit by %" PRIu64 " byte.",
1671 : : "The slot's restart_lsn %X/%08X exceeds the limit by %" PRIu64 " bytes.",
1672 : : ex),
795 peter@eisentraut.org 1673 :CBC 6 : LSN_FORMAT_ARGS(restart_lsn),
1674 : : ex);
1675 : : /* translator: %s is a GUC variable name */
250 akapila@postgresql.o 1676 : 6 : appendStringInfo(&err_hint, _("You might need to increase \"%s\"."),
1677 : : "max_slot_wal_keep_size");
795 peter@eisentraut.org 1678 : 6 : break;
1679 : : }
934 andres@anarazel.de 1680 :UBC 0 : case RS_INVAL_HORIZON:
1681 : 0 : appendStringInfo(&err_detail, _("The slot conflicted with xid horizon %u."),
1682 : : snapshotConflictHorizon);
1683 : 0 : break;
1684 : :
1685 : 0 : case RS_INVAL_WAL_LEVEL:
528 peter@eisentraut.org 1686 : 0 : appendStringInfoString(&err_detail, _("Logical decoding on standby requires \"wal_level\" >= \"logical\" on the primary server."));
934 andres@anarazel.de 1687 : 0 : break;
1688 : :
250 akapila@postgresql.o 1689 : 0 : case RS_INVAL_IDLE_TIMEOUT:
1690 : : {
1691 : : /* translator: %s is a GUC variable name */
108 fujii@postgresql.org 1692 : 0 : appendStringInfo(&err_detail, _("The slot's idle time of %lds exceeds the configured \"%s\" duration of %ds."),
1693 : : slot_idle_seconds, "idle_replication_slot_timeout",
1694 : : idle_replication_slot_timeout_secs);
1695 : : /* translator: %s is a GUC variable name */
250 akapila@postgresql.o 1696 : 0 : appendStringInfo(&err_hint, _("You might need to increase \"%s\"."),
1697 : : "idle_replication_slot_timeout");
1698 : 0 : break;
1699 : : }
934 andres@anarazel.de 1700 : 0 : case RS_INVAL_NONE:
1701 : 0 : pg_unreachable();
1702 : : }
1703 : :
934 andres@anarazel.de 1704 [ + - + + :CBC 6 : ereport(LOG,
+ - ]
1705 : : terminating ?
1706 : : errmsg("terminating process %d to release replication slot \"%s\"",
1707 : : pid, NameStr(slotname)) :
1708 : : errmsg("invalidating obsolete replication slot \"%s\"",
1709 : : NameStr(slotname)),
1710 : : errdetail_internal("%s", err_detail.data),
1711 : : err_hint.len ? errhint("%s", err_hint.data) : 0);
1712 : :
1713 : 6 : pfree(err_detail.data);
250 akapila@postgresql.o 1714 : 6 : pfree(err_hint.data);
1715 : 6 : }
1716 : :
1717 : : /*
1718 : : * Can we invalidate an idle replication slot?
1719 : : *
1720 : : * Idle timeout invalidation is allowed only when:
1721 : : *
1722 : : * 1. Idle timeout is set
1723 : : * 2. Slot has reserved WAL
1724 : : * 3. Slot is inactive
1725 : : * 4. The slot is not being synced from the primary while the server is in
1726 : : * recovery. This is because synced slots are always considered to be
1727 : : * inactive because they don't perform logical decoding to produce changes.
1728 : : */
1729 : : static inline bool
1730 : 351 : CanInvalidateIdleSlot(ReplicationSlot *s)
1731 : : {
108 fujii@postgresql.org 1732 : 351 : return (idle_replication_slot_timeout_secs != 0 &&
250 akapila@postgresql.o 1733 [ # # ]:UBC 0 : !XLogRecPtrIsInvalid(s->data.restart_lsn) &&
250 akapila@postgresql.o 1734 [ - + - - ]:CBC 351 : s->inactive_since > 0 &&
250 akapila@postgresql.o 1735 [ # # # # ]:UBC 0 : !(RecoveryInProgress() && s->data.synced));
1736 : : }
1737 : :
1738 : : /*
1739 : : * DetermineSlotInvalidationCause - Determine the cause for which a slot
1740 : : * becomes invalid among the given possible causes.
1741 : : *
1742 : : * This function sequentially checks all possible invalidation causes and
1743 : : * returns the first one for which the slot is eligible for invalidation.
1744 : : */
1745 : : static ReplicationSlotInvalidationCause
250 akapila@postgresql.o 1746 :CBC 357 : DetermineSlotInvalidationCause(uint32 possible_causes, ReplicationSlot *s,
1747 : : XLogRecPtr oldestLSN, Oid dboid,
1748 : : TransactionId snapshotConflictHorizon,
1749 : : TransactionId initial_effective_xmin,
1750 : : TransactionId initial_catalog_effective_xmin,
1751 : : XLogRecPtr initial_restart_lsn,
1752 : : TimestampTz *inactive_since, TimestampTz now)
1753 : : {
1754 [ - + ]: 357 : Assert(possible_causes != RS_INVAL_NONE);
1755 : :
1756 [ + - ]: 357 : if (possible_causes & RS_INVAL_WAL_REMOVED)
1757 : : {
1758 [ + + + + ]: 357 : if (initial_restart_lsn != InvalidXLogRecPtr &&
1759 : : initial_restart_lsn < oldestLSN)
1760 : 6 : return RS_INVAL_WAL_REMOVED;
1761 : : }
1762 : :
1763 [ - + ]: 351 : if (possible_causes & RS_INVAL_HORIZON)
1764 : : {
1765 : : /* invalid DB oid signals a shared relation */
250 akapila@postgresql.o 1766 [ # # # # ]:UBC 0 : if (SlotIsLogical(s) &&
1767 [ # # ]: 0 : (dboid == InvalidOid || dboid == s->data.database))
1768 : : {
1769 [ # # # # ]: 0 : if (TransactionIdIsValid(initial_effective_xmin) &&
1770 : 0 : TransactionIdPrecedesOrEquals(initial_effective_xmin,
1771 : : snapshotConflictHorizon))
1772 : 0 : return RS_INVAL_HORIZON;
1773 [ # # # # ]: 0 : else if (TransactionIdIsValid(initial_catalog_effective_xmin) &&
1774 : 0 : TransactionIdPrecedesOrEquals(initial_catalog_effective_xmin,
1775 : : snapshotConflictHorizon))
1776 : 0 : return RS_INVAL_HORIZON;
1777 : : }
1778 : : }
1779 : :
250 akapila@postgresql.o 1780 [ - + ]:CBC 351 : if (possible_causes & RS_INVAL_WAL_LEVEL)
1781 : : {
250 akapila@postgresql.o 1782 [ # # ]:UBC 0 : if (SlotIsLogical(s))
1783 : 0 : return RS_INVAL_WAL_LEVEL;
1784 : : }
1785 : :
250 akapila@postgresql.o 1786 [ + - ]:CBC 351 : if (possible_causes & RS_INVAL_IDLE_TIMEOUT)
1787 : : {
1788 [ - + ]: 351 : Assert(now > 0);
1789 : :
1790 [ - + ]: 351 : if (CanInvalidateIdleSlot(s))
1791 : : {
1792 : : /*
1793 : : * Simulate the invalidation due to idle_timeout to test the
1794 : : * timeout behavior promptly, without waiting for it to trigger
1795 : : * naturally.
1796 : : */
1797 : : #ifdef USE_INJECTION_POINTS
1798 : : if (IS_INJECTION_POINT_ATTACHED("slot-timeout-inval"))
1799 : : {
1800 : : *inactive_since = 0; /* since the beginning of time */
1801 : : return RS_INVAL_IDLE_TIMEOUT;
1802 : : }
1803 : : #endif
1804 : :
1805 : : /*
1806 : : * Check if the slot needs to be invalidated due to
1807 : : * idle_replication_slot_timeout GUC.
1808 : : */
250 akapila@postgresql.o 1809 [ # # ]:UBC 0 : if (TimestampDifferenceExceedsSeconds(s->inactive_since, now,
1810 : : idle_replication_slot_timeout_secs))
1811 : : {
1812 : 0 : *inactive_since = s->inactive_since;
1813 : 0 : return RS_INVAL_IDLE_TIMEOUT;
1814 : : }
1815 : : }
1816 : : }
1817 : :
250 akapila@postgresql.o 1818 :CBC 351 : return RS_INVAL_NONE;
1819 : : }
1820 : :
1821 : : /*
1822 : : * Helper for InvalidateObsoleteReplicationSlots
1823 : : *
1824 : : * Acquires the given slot and mark it invalid, if necessary and possible.
1825 : : *
1826 : : * Returns whether ReplicationSlotControlLock was released in the interim (and
1827 : : * in that case we're not holding the lock at return, otherwise we are).
1828 : : *
1829 : : * Sets *invalidated true if the slot was invalidated. (Untouched otherwise.)
1830 : : *
1831 : : * This is inherently racy, because we release the LWLock
1832 : : * for syscalls, so caller must restart if we return true.
1833 : : */
1834 : : static bool
1835 : 363 : InvalidatePossiblyObsoleteSlot(uint32 possible_causes,
1836 : : ReplicationSlot *s,
1837 : : XLogRecPtr oldestLSN,
1838 : : Oid dboid, TransactionId snapshotConflictHorizon,
1839 : : bool *invalidated)
1840 : : {
1599 alvherre@alvh.no-ip. 1841 : 363 : int last_signaled_pid = 0;
1842 : 363 : bool released_lock = false;
615 michael@paquier.xyz 1843 : 363 : bool terminated = false;
564 1844 : 363 : TransactionId initial_effective_xmin = InvalidTransactionId;
1845 : 363 : TransactionId initial_catalog_effective_xmin = InvalidTransactionId;
615 1846 : 363 : XLogRecPtr initial_restart_lsn = InvalidXLogRecPtr;
584 akapila@postgresql.o 1847 : 363 : ReplicationSlotInvalidationCause invalidation_cause_prev PG_USED_FOR_ASSERTS_ONLY = RS_INVAL_NONE;
250 1848 : 363 : TimestampTz inactive_since = 0;
1849 : :
1850 : : for (;;)
2029 alvherre@alvh.no-ip. 1851 : 2 : {
1852 : : XLogRecPtr restart_lsn;
1853 : : NameData slotname;
1599 1854 : 365 : int active_pid = 0;
584 akapila@postgresql.o 1855 : 365 : ReplicationSlotInvalidationCause invalidation_cause = RS_INVAL_NONE;
250 1856 : 365 : TimestampTz now = 0;
1857 : 365 : long slot_idle_secs = 0;
1858 : :
1599 alvherre@alvh.no-ip. 1859 [ - + ]: 365 : Assert(LWLockHeldByMeInMode(ReplicationSlotControlLock, LW_SHARED));
1860 : :
2029 1861 [ - + ]: 365 : if (!s->in_use)
1862 : : {
1599 alvherre@alvh.no-ip. 1863 [ # # ]:UBC 0 : if (released_lock)
1864 : 0 : LWLockRelease(ReplicationSlotControlLock);
1865 : 0 : break;
1866 : : }
1867 : :
250 akapila@postgresql.o 1868 [ + - ]:CBC 365 : if (possible_causes & RS_INVAL_IDLE_TIMEOUT)
1869 : : {
1870 : : /*
1871 : : * Assign the current time here to avoid system call overhead
1872 : : * while holding the spinlock in subsequent code.
1873 : : */
1874 : 365 : now = GetCurrentTimestamp();
1875 : : }
1876 : :
1877 : : /*
1878 : : * Check if the slot needs to be invalidated. If it needs to be
1879 : : * invalidated, and is not currently acquired, acquire it and mark it
1880 : : * as having been invalidated. We do this with the spinlock held to
1881 : : * avoid race conditions -- for example the restart_lsn could move
1882 : : * forward, or the slot could be dropped.
1883 : : */
2029 alvherre@alvh.no-ip. 1884 [ - + ]: 365 : SpinLockAcquire(&s->mutex);
1885 : :
1886 : 365 : restart_lsn = s->data.restart_lsn;
1887 : :
1888 : : /* we do nothing if the slot is already invalid */
934 andres@anarazel.de 1889 [ + + ]: 365 : if (s->data.invalidated == RS_INVAL_NONE)
1890 : : {
1891 : : /*
1892 : : * The slot's mutex will be released soon, and it is possible that
1893 : : * those values change since the process holding the slot has been
1894 : : * terminated (if any), so record them here to ensure that we
1895 : : * would report the correct invalidation cause.
1896 : : *
1897 : : * Unlike other slot attributes, slot's inactive_since can't be
1898 : : * changed until the acquired slot is released or the owning
1899 : : * process is terminated. So, the inactive slot can only be
1900 : : * invalidated immediately without being terminated.
1901 : : */
615 michael@paquier.xyz 1902 [ + + ]: 357 : if (!terminated)
1903 : : {
1904 : 355 : initial_restart_lsn = s->data.restart_lsn;
1905 : 355 : initial_effective_xmin = s->effective_xmin;
1906 : 355 : initial_catalog_effective_xmin = s->effective_catalog_xmin;
1907 : : }
1908 : :
250 akapila@postgresql.o 1909 : 357 : invalidation_cause = DetermineSlotInvalidationCause(possible_causes,
1910 : : s, oldestLSN,
1911 : : dboid,
1912 : : snapshotConflictHorizon,
1913 : : initial_effective_xmin,
1914 : : initial_catalog_effective_xmin,
1915 : : initial_restart_lsn,
1916 : : &inactive_since,
1917 : : now);
1918 : : }
1919 : :
1920 : : /*
1921 : : * The invalidation cause recorded previously should not change while
1922 : : * the process owning the slot (if any) has been terminated.
1923 : : */
584 1924 [ + + + - : 365 : Assert(!(invalidation_cause_prev != RS_INVAL_NONE && terminated &&
- + ]
1925 : : invalidation_cause_prev != invalidation_cause));
1926 : :
1927 : : /* if there's no invalidation, we're done */
1928 [ + + ]: 365 : if (invalidation_cause == RS_INVAL_NONE)
1929 : : {
1599 alvherre@alvh.no-ip. 1930 : 359 : SpinLockRelease(&s->mutex);
1931 [ - + ]: 359 : if (released_lock)
1599 alvherre@alvh.no-ip. 1932 :UBC 0 : LWLockRelease(ReplicationSlotControlLock);
1599 alvherre@alvh.no-ip. 1933 :CBC 359 : break;
1934 : : }
1935 : :
1936 : 6 : slotname = s->data.name;
1937 : 6 : active_pid = s->active_pid;
1938 : :
1939 : : /*
1940 : : * If the slot can be acquired, do so and mark it invalidated
1941 : : * immediately. Otherwise we'll signal the owning process, below, and
1942 : : * retry.
1943 : : */
1944 [ + + ]: 6 : if (active_pid == 0)
1945 : : {
1946 : 4 : MyReplicationSlot = s;
1947 : 4 : s->active_pid = MyProcPid;
584 akapila@postgresql.o 1948 : 4 : s->data.invalidated = invalidation_cause;
1949 : :
1950 : : /*
1951 : : * XXX: We should consider not overwriting restart_lsn and instead
1952 : : * just rely on .invalidated.
1953 : : */
1954 [ + - ]: 4 : if (invalidation_cause == RS_INVAL_WAL_REMOVED)
1955 : : {
934 andres@anarazel.de 1956 : 4 : s->data.restart_lsn = InvalidXLogRecPtr;
135 akorotkov@postgresql 1957 : 4 : s->last_saved_restart_lsn = InvalidXLogRecPtr;
1958 : : }
1959 : :
1960 : : /* Let caller know */
1564 alvherre@alvh.no-ip. 1961 : 4 : *invalidated = true;
1962 : : }
1963 : :
1599 1964 : 6 : SpinLockRelease(&s->mutex);
1965 : :
1966 : : /*
1967 : : * Calculate the idle time duration of the slot if slot is marked
1968 : : * invalidated with RS_INVAL_IDLE_TIMEOUT.
1969 : : */
250 akapila@postgresql.o 1970 [ - + ]: 6 : if (invalidation_cause == RS_INVAL_IDLE_TIMEOUT)
1971 : : {
1972 : : int slot_idle_usecs;
1973 : :
250 akapila@postgresql.o 1974 :UBC 0 : TimestampDifference(inactive_since, now, &slot_idle_secs,
1975 : : &slot_idle_usecs);
1976 : : }
1977 : :
1599 alvherre@alvh.no-ip. 1978 [ + + ]:CBC 6 : if (active_pid != 0)
1979 : : {
1980 : : /*
1981 : : * Prepare the sleep on the slot's condition variable before
1982 : : * releasing the lock, to close a possible race condition if the
1983 : : * slot is released before the sleep below.
1984 : : */
1985 : 2 : ConditionVariablePrepareToSleep(&s->active_cv);
1986 : :
1987 : 2 : LWLockRelease(ReplicationSlotControlLock);
1988 : 2 : released_lock = true;
1989 : :
1990 : : /*
1991 : : * Signal to terminate the process that owns the slot, if we
1992 : : * haven't already signalled it. (Avoidance of repeated
1993 : : * signalling is the only reason for there to be a loop in this
1994 : : * routine; otherwise we could rely on caller's restart loop.)
1995 : : *
1996 : : * There is the race condition that other process may own the slot
1997 : : * after its current owner process is terminated and before this
1998 : : * process owns it. To handle that, we signal only if the PID of
1999 : : * the owning process has changed from the previous time. (This
2000 : : * logic assumes that the same PID is not reused very quickly.)
2001 : : */
2002 [ + - ]: 2 : if (last_signaled_pid != active_pid)
2003 : : {
584 akapila@postgresql.o 2004 : 2 : ReportSlotInvalidation(invalidation_cause, true, active_pid,
2005 : : slotname, restart_lsn,
2006 : : oldestLSN, snapshotConflictHorizon,
2007 : : slot_idle_secs);
2008 : :
934 andres@anarazel.de 2009 [ - + ]: 2 : if (MyBackendType == B_STARTUP)
934 andres@anarazel.de 2010 :UBC 0 : (void) SendProcSignal(active_pid,
2011 : : PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT,
2012 : : INVALID_PROC_NUMBER);
2013 : : else
934 andres@anarazel.de 2014 :CBC 2 : (void) kill(active_pid, SIGTERM);
2015 : :
1599 alvherre@alvh.no-ip. 2016 : 2 : last_signaled_pid = active_pid;
615 michael@paquier.xyz 2017 : 2 : terminated = true;
584 akapila@postgresql.o 2018 : 2 : invalidation_cause_prev = invalidation_cause;
2019 : : }
2020 : :
2021 : : /* Wait until the slot is released. */
1599 alvherre@alvh.no-ip. 2022 : 2 : ConditionVariableSleep(&s->active_cv,
2023 : : WAIT_EVENT_REPLICATION_SLOT_DROP);
2024 : :
2025 : : /*
2026 : : * Re-acquire lock and start over; we expect to invalidate the
2027 : : * slot next time (unless another process acquires the slot in the
2028 : : * meantime).
2029 : : */
2030 : 2 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
2031 : 2 : continue;
2032 : : }
2033 : : else
2034 : : {
2035 : : /*
2036 : : * We hold the slot now and have already invalidated it; flush it
2037 : : * to ensure that state persists.
2038 : : *
2039 : : * Don't want to hold ReplicationSlotControlLock across file
2040 : : * system operations, so release it now but be sure to tell caller
2041 : : * to restart from scratch.
2042 : : */
2043 : 4 : LWLockRelease(ReplicationSlotControlLock);
2044 : 4 : released_lock = true;
2045 : :
2046 : : /* Make sure the invalidated state persists across server restart */
2047 : 4 : ReplicationSlotMarkDirty();
2048 : 4 : ReplicationSlotSave();
2049 : 4 : ReplicationSlotRelease();
2050 : :
584 akapila@postgresql.o 2051 : 4 : ReportSlotInvalidation(invalidation_cause, false, active_pid,
2052 : : slotname, restart_lsn,
2053 : : oldestLSN, snapshotConflictHorizon,
2054 : : slot_idle_secs);
2055 : :
2056 : : /* done with this slot for now */
1599 alvherre@alvh.no-ip. 2057 : 4 : break;
2058 : : }
2059 : : }
2060 : :
2061 [ - + ]: 363 : Assert(released_lock == !LWLockHeldByMe(ReplicationSlotControlLock));
2062 : :
2063 : 363 : return released_lock;
2064 : : }
2065 : :
2066 : : /*
2067 : : * Invalidate slots that require resources about to be removed.
2068 : : *
2069 : : * Returns true when any slot have got invalidated.
2070 : : *
2071 : : * Whether a slot needs to be invalidated depends on the invalidation cause.
2072 : : * A slot is invalidated if it:
2073 : : * - RS_INVAL_WAL_REMOVED: requires a LSN older than the given segment
2074 : : * - RS_INVAL_HORIZON: requires a snapshot <= the given horizon in the given
2075 : : * db; dboid may be InvalidOid for shared relations
2076 : : * - RS_INVAL_WAL_LEVEL: is logical and wal_level is insufficient
2077 : : * - RS_INVAL_IDLE_TIMEOUT: has been idle longer than the configured
2078 : : * "idle_replication_slot_timeout" duration.
2079 : : *
2080 : : * Note: This function attempts to invalidate the slot for multiple possible
2081 : : * causes in a single pass, minimizing redundant iterations. The "cause"
2082 : : * parameter can be a MASK representing one or more of the defined causes.
2083 : : *
2084 : : * NB - this runs as part of checkpoint, so avoid raising errors if possible.
2085 : : */
2086 : : bool
250 akapila@postgresql.o 2087 : 1705 : InvalidateObsoleteReplicationSlots(uint32 possible_causes,
2088 : : XLogSegNo oldestSegno, Oid dboid,
2089 : : TransactionId snapshotConflictHorizon)
2090 : : {
2091 : : XLogRecPtr oldestLSN;
1564 alvherre@alvh.no-ip. 2092 : 1705 : bool invalidated = false;
2093 : :
250 akapila@postgresql.o 2094 [ - + - - ]: 1705 : Assert(!(possible_causes & RS_INVAL_HORIZON) || TransactionIdIsValid(snapshotConflictHorizon));
2095 [ + + - + ]: 1705 : Assert(!(possible_causes & RS_INVAL_WAL_REMOVED) || oldestSegno > 0);
2096 [ - + ]: 1705 : Assert(possible_causes != RS_INVAL_NONE);
2097 : :
934 andres@anarazel.de 2098 [ + + ]: 1705 : if (max_replication_slots == 0)
934 andres@anarazel.de 2099 :GBC 1 : return invalidated;
2100 : :
1599 alvherre@alvh.no-ip. 2101 :CBC 1704 : XLogSegNoOffsetToRecPtr(oldestSegno, 0, wal_segment_size, oldestLSN);
2102 : :
2103 : 1708 : restart:
2104 : 1708 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
2105 [ + + ]: 18504 : for (int i = 0; i < max_replication_slots; i++)
2106 : : {
2107 : 16800 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
2108 : :
2109 [ + + ]: 16800 : if (!s->in_use)
2110 : 16430 : continue;
2111 : :
2112 : : /* Prevent invalidation of logical slots during binary upgrade */
108 akapila@postgresql.o 2113 [ + + + + ]: 370 : if (SlotIsLogical(s) && IsBinaryUpgrade)
2114 : 7 : continue;
2115 : :
250 2116 [ + + ]: 363 : if (InvalidatePossiblyObsoleteSlot(possible_causes, s, oldestLSN, dboid,
2117 : : snapshotConflictHorizon,
2118 : : &invalidated))
2119 : : {
2120 : : /* if the lock was released, start from scratch */
1599 alvherre@alvh.no-ip. 2121 : 4 : goto restart;
2122 : : }
2123 : : }
2029 2124 : 1704 : LWLockRelease(ReplicationSlotControlLock);
2125 : :
2126 : : /*
2127 : : * If any slots have been invalidated, recalculate the resource limits.
2128 : : */
1564 2129 [ + + ]: 1704 : if (invalidated)
2130 : : {
2131 : 4 : ReplicationSlotsComputeRequiredXmin(false);
2132 : 4 : ReplicationSlotsComputeRequiredLSN();
2133 : : }
2134 : :
2135 : 1704 : return invalidated;
2136 : : }
2137 : :
2138 : : /*
2139 : : * Flush all replication slots to disk.
2140 : : *
2141 : : * It is convenient to flush dirty replication slots at the time of checkpoint.
2142 : : * Additionally, in case of a shutdown checkpoint, we also identify the slots
2143 : : * for which the confirmed_flush LSN has been updated since the last time it
2144 : : * was saved and flush them.
2145 : : */
2146 : : void
774 akapila@postgresql.o 2147 : 1701 : CheckPointReplicationSlots(bool is_shutdown)
2148 : : {
2149 : : int i;
122 akorotkov@postgresql 2150 : 1701 : bool last_saved_restart_lsn_updated = false;
2151 : :
4003 peter_e@gmx.net 2152 [ + + ]: 1701 : elog(DEBUG1, "performing replication slot checkpoint");
2153 : :
2154 : : /*
2155 : : * Prevent any slot from being created/dropped while we're active. As we
2156 : : * explicitly do *not* want to block iterating over replication_slots or
2157 : : * acquiring a slot we cannot take the control lock - but that's OK,
2158 : : * because holding ReplicationSlotAllocationLock is strictly stronger, and
2159 : : * enough to guarantee that nobody can change the in_use bits on us.
2160 : : */
4287 rhaas@postgresql.org 2161 : 1701 : LWLockAcquire(ReplicationSlotAllocationLock, LW_SHARED);
2162 : :
2163 [ + + ]: 18457 : for (i = 0; i < max_replication_slots; i++)
2164 : : {
2165 : 16756 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
2166 : : char path[MAXPGPATH];
2167 : :
2168 [ + + ]: 16756 : if (!s->in_use)
2169 : 16390 : continue;
2170 : :
2171 : : /* save the slot to disk, locking is handled in SaveSlotToPath() */
423 michael@paquier.xyz 2172 : 366 : sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(s->data.name));
2173 : :
2174 : : /*
2175 : : * Slot's data is not flushed each time the confirmed_flush LSN is
2176 : : * updated as that could lead to frequent writes. However, we decide
2177 : : * to force a flush of all logical slot's data at the time of shutdown
2178 : : * if the confirmed_flush LSN is changed since we last flushed it to
2179 : : * disk. This helps in avoiding an unnecessary retreat of the
2180 : : * confirmed_flush LSN after restart.
2181 : : */
774 akapila@postgresql.o 2182 [ + + + + ]: 366 : if (is_shutdown && SlotIsLogical(s))
2183 : : {
2184 [ - + ]: 77 : SpinLockAcquire(&s->mutex);
2185 : :
2186 [ + - ]: 77 : if (s->data.invalidated == RS_INVAL_NONE &&
503 2187 [ + + ]: 77 : s->data.confirmed_flush > s->last_saved_confirmed_flush)
2188 : : {
774 2189 : 37 : s->just_dirtied = true;
2190 : 37 : s->dirty = true;
2191 : : }
2192 : 77 : SpinLockRelease(&s->mutex);
2193 : : }
2194 : :
2195 : : /*
2196 : : * Track if we're going to update slot's last_saved_restart_lsn. We
2197 : : * need this to know if we need to recompute the required LSN.
2198 : : */
122 akorotkov@postgresql 2199 [ + + ]: 366 : if (s->last_saved_restart_lsn != s->data.restart_lsn)
2200 : 199 : last_saved_restart_lsn_updated = true;
2201 : :
4287 rhaas@postgresql.org 2202 : 366 : SaveSlotToPath(s, path, LOG);
2203 : : }
2204 : 1701 : LWLockRelease(ReplicationSlotAllocationLock);
2205 : :
2206 : : /*
2207 : : * Recompute the required LSN if SaveSlotToPath() updated
2208 : : * last_saved_restart_lsn for any slot.
2209 : : */
122 akorotkov@postgresql 2210 [ + + ]: 1701 : if (last_saved_restart_lsn_updated)
2211 : 199 : ReplicationSlotsComputeRequiredLSN();
4287 rhaas@postgresql.org 2212 : 1701 : }
2213 : :
2214 : : /*
2215 : : * Load all replication slots from disk into memory at server startup. This
2216 : : * needs to be run before we start crash recovery.
2217 : : */
2218 : : void
4155 andres@anarazel.de 2219 : 907 : StartupReplicationSlots(void)
2220 : : {
2221 : : DIR *replication_dir;
2222 : : struct dirent *replication_de;
2223 : :
4003 peter_e@gmx.net 2224 [ + + ]: 907 : elog(DEBUG1, "starting up replication slots");
2225 : :
2226 : : /* restore all slots by iterating over all on-disk entries */
423 michael@paquier.xyz 2227 : 907 : replication_dir = AllocateDir(PG_REPLSLOT_DIR);
2228 [ + + ]: 2819 : while ((replication_de = ReadDir(replication_dir, PG_REPLSLOT_DIR)) != NULL)
2229 : : {
2230 : : char path[MAXPGPATH + sizeof(PG_REPLSLOT_DIR)];
2231 : : PGFileType de_type;
2232 : :
4287 rhaas@postgresql.org 2233 [ + + ]: 1912 : if (strcmp(replication_de->d_name, ".") == 0 ||
2234 [ + + ]: 1005 : strcmp(replication_de->d_name, "..") == 0)
2235 : 1814 : continue;
2236 : :
423 michael@paquier.xyz 2237 : 98 : snprintf(path, sizeof(path), "%s/%s", PG_REPLSLOT_DIR, replication_de->d_name);
1151 2238 : 98 : de_type = get_dirent_type(path, replication_de, false, DEBUG1);
2239 : :
2240 : : /* we're only creating directories here, skip if it's not our's */
2241 [ + - - + ]: 98 : if (de_type != PGFILETYPE_ERROR && de_type != PGFILETYPE_DIR)
4287 rhaas@postgresql.org 2242 :UBC 0 : continue;
2243 : :
2244 : : /* we crashed while a slot was being setup or deleted, clean up */
3950 andres@anarazel.de 2245 [ - + ]:CBC 98 : if (pg_str_endswith(replication_de->d_name, ".tmp"))
2246 : : {
4287 rhaas@postgresql.org 2247 [ # # ]:UBC 0 : if (!rmtree(path, true))
2248 : : {
2249 [ # # ]: 0 : ereport(WARNING,
2250 : : (errmsg("could not remove directory \"%s\"",
2251 : : path)));
2252 : 0 : continue;
2253 : : }
423 michael@paquier.xyz 2254 : 0 : fsync_fname(PG_REPLSLOT_DIR, true);
4287 rhaas@postgresql.org 2255 : 0 : continue;
2256 : : }
2257 : :
2258 : : /* looks like a slot in a normal state, restore */
4287 rhaas@postgresql.org 2259 :CBC 98 : RestoreSlotFromDisk(replication_de->d_name);
2260 : : }
2261 : 907 : FreeDir(replication_dir);
2262 : :
2263 : : /* currently no slots exist, we're done. */
2264 [ + + ]: 907 : if (max_replication_slots <= 0)
4287 rhaas@postgresql.org 2265 :GBC 1 : return;
2266 : :
2267 : : /* Now that we have recovered all the data, compute replication xmin */
4256 rhaas@postgresql.org 2268 :CBC 906 : ReplicationSlotsComputeRequiredXmin(false);
4287 2269 : 906 : ReplicationSlotsComputeRequiredLSN();
2270 : : }
2271 : :
2272 : : /* ----
2273 : : * Manipulation of on-disk state of replication slots
2274 : : *
2275 : : * NB: none of the routines below should take any notice whether a slot is the
2276 : : * current one or not, that's all handled a layer above.
2277 : : * ----
2278 : : */
2279 : : static void
2280 : 614 : CreateSlotOnDisk(ReplicationSlot *slot)
2281 : : {
2282 : : char tmppath[MAXPGPATH];
2283 : : char path[MAXPGPATH];
2284 : : struct stat st;
2285 : :
2286 : : /*
2287 : : * No need to take out the io_in_progress_lock, nobody else can see this
2288 : : * slot yet, so nobody else will write. We're reusing SaveSlotToPath which
2289 : : * takes out the lock, if we'd take the lock here, we'd deadlock.
2290 : : */
2291 : :
423 michael@paquier.xyz 2292 : 614 : sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(slot->data.name));
2293 : 614 : sprintf(tmppath, "%s/%s.tmp", PG_REPLSLOT_DIR, NameStr(slot->data.name));
2294 : :
2295 : : /*
2296 : : * It's just barely possible that some previous effort to create or drop a
2297 : : * slot with this name left a temp directory lying around. If that seems
2298 : : * to be the case, try to remove it. If the rmtree() fails, we'll error
2299 : : * out at the MakePGDirectory() below, so we don't bother checking
2300 : : * success.
2301 : : */
4287 rhaas@postgresql.org 2302 [ - + - - ]: 614 : if (stat(tmppath, &st) == 0 && S_ISDIR(st.st_mode))
4287 rhaas@postgresql.org 2303 :UBC 0 : rmtree(tmppath, true);
2304 : :
2305 : : /* Create and fsync the temporary slot directory. */
2760 sfrost@snowman.net 2306 [ - + ]:CBC 614 : if (MakePGDirectory(tmppath) < 0)
4287 rhaas@postgresql.org 2307 [ # # ]:UBC 0 : ereport(ERROR,
2308 : : (errcode_for_file_access(),
2309 : : errmsg("could not create directory \"%s\": %m",
2310 : : tmppath)));
4287 rhaas@postgresql.org 2311 :CBC 614 : fsync_fname(tmppath, true);
2312 : :
2313 : : /* Write the actual state file. */
4192 bruce@momjian.us 2314 : 614 : slot->dirty = true; /* signal that we really need to write */
4287 rhaas@postgresql.org 2315 : 614 : SaveSlotToPath(slot, tmppath, ERROR);
2316 : :
2317 : : /* Rename the directory into place. */
2318 [ - + ]: 614 : if (rename(tmppath, path) != 0)
4287 rhaas@postgresql.org 2319 [ # # ]:UBC 0 : ereport(ERROR,
2320 : : (errcode_for_file_access(),
2321 : : errmsg("could not rename file \"%s\" to \"%s\": %m",
2322 : : tmppath, path)));
2323 : :
2324 : : /*
2325 : : * If we'd now fail - really unlikely - we wouldn't know whether this slot
2326 : : * would persist after an OS crash or not - so, force a restart. The
2327 : : * restart would try to fsync this again till it works.
2328 : : */
4287 rhaas@postgresql.org 2329 :CBC 614 : START_CRIT_SECTION();
2330 : :
2331 : 614 : fsync_fname(path, true);
423 michael@paquier.xyz 2332 : 614 : fsync_fname(PG_REPLSLOT_DIR, true);
2333 : :
4287 rhaas@postgresql.org 2334 [ - + ]: 614 : END_CRIT_SECTION();
2335 : 614 : }
2336 : :
2337 : : /*
2338 : : * Shared functionality between saving and creating a replication slot.
2339 : : */
2340 : : static void
2341 : 2261 : SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
2342 : : {
2343 : : char tmppath[MAXPGPATH];
2344 : : char path[MAXPGPATH];
2345 : : int fd;
2346 : : ReplicationSlotOnDisk cp;
2347 : : bool was_dirty;
2348 : :
2349 : : /* first check whether there's something to write out */
3674 2350 [ - + ]: 2261 : SpinLockAcquire(&slot->mutex);
2351 : 2261 : was_dirty = slot->dirty;
2352 : 2261 : slot->just_dirtied = false;
2353 : 2261 : SpinLockRelease(&slot->mutex);
2354 : :
2355 : : /* and don't do anything if there's nothing to write */
4287 2356 [ + + ]: 2261 : if (!was_dirty)
2357 : 120 : return;
2358 : :
3559 2359 : 2141 : LWLockAcquire(&slot->io_in_progress_lock, LW_EXCLUSIVE);
2360 : :
2361 : : /* silence valgrind :( */
4287 2362 : 2141 : memset(&cp, 0, sizeof(ReplicationSlotOnDisk));
2363 : :
2364 : 2141 : sprintf(tmppath, "%s/state.tmp", dir);
2365 : 2141 : sprintf(path, "%s/state", dir);
2366 : :
2956 peter_e@gmx.net 2367 : 2141 : fd = OpenTransientFile(tmppath, O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
4287 rhaas@postgresql.org 2368 [ - + ]: 2141 : if (fd < 0)
2369 : : {
2370 : : /*
2371 : : * If not an ERROR, then release the lock before returning. In case
2372 : : * of an ERROR, the error recovery path automatically releases the
2373 : : * lock, but no harm in explicitly releasing even in that case. Note
2374 : : * that LWLockRelease() could affect errno.
2375 : : */
2031 peter@eisentraut.org 2376 :UBC 0 : int save_errno = errno;
2377 : :
2041 2378 : 0 : LWLockRelease(&slot->io_in_progress_lock);
2031 2379 : 0 : errno = save_errno;
4287 rhaas@postgresql.org 2380 [ # # ]: 0 : ereport(elevel,
2381 : : (errcode_for_file_access(),
2382 : : errmsg("could not create file \"%s\": %m",
2383 : : tmppath)));
2384 : 0 : return;
2385 : : }
2386 : :
4287 rhaas@postgresql.org 2387 :CBC 2141 : cp.magic = SLOT_MAGIC;
4010 heikki.linnakangas@i 2388 : 2141 : INIT_CRC32C(cp.checksum);
4002 andres@anarazel.de 2389 : 2141 : cp.version = SLOT_VERSION;
2390 : 2141 : cp.length = ReplicationSlotOnDiskV2Size;
2391 : :
4287 rhaas@postgresql.org 2392 [ - + ]: 2141 : SpinLockAcquire(&slot->mutex);
2393 : :
2394 : 2141 : memcpy(&cp.slotdata, &slot->data, sizeof(ReplicationSlotPersistentData));
2395 : :
2396 : 2141 : SpinLockRelease(&slot->mutex);
2397 : :
4010 heikki.linnakangas@i 2398 : 2141 : COMP_CRC32C(cp.checksum,
2399 : : (char *) (&cp) + ReplicationSlotOnDiskNotChecksummedSize,
2400 : : ReplicationSlotOnDiskChecksummedSize);
4002 andres@anarazel.de 2401 : 2141 : FIN_CRC32C(cp.checksum);
2402 : :
2640 michael@paquier.xyz 2403 : 2141 : errno = 0;
3145 rhaas@postgresql.org 2404 : 2141 : pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_WRITE);
4287 2405 [ - + ]: 2141 : if ((write(fd, &cp, sizeof(cp))) != sizeof(cp))
2406 : : {
4192 bruce@momjian.us 2407 :UBC 0 : int save_errno = errno;
2408 : :
3145 rhaas@postgresql.org 2409 : 0 : pgstat_report_wait_end();
4287 2410 : 0 : CloseTransientFile(fd);
17 michael@paquier.xyz 2411 : 0 : unlink(tmppath);
2041 peter@eisentraut.org 2412 : 0 : LWLockRelease(&slot->io_in_progress_lock);
2413 : :
2414 : : /* if write didn't set errno, assume problem is no disk space */
2681 michael@paquier.xyz 2415 [ # # ]: 0 : errno = save_errno ? save_errno : ENOSPC;
4287 rhaas@postgresql.org 2416 [ # # ]: 0 : ereport(elevel,
2417 : : (errcode_for_file_access(),
2418 : : errmsg("could not write to file \"%s\": %m",
2419 : : tmppath)));
2420 : 0 : return;
2421 : : }
3145 rhaas@postgresql.org 2422 :CBC 2141 : pgstat_report_wait_end();
2423 : :
2424 : : /* fsync the temporary file */
2425 : 2141 : pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_SYNC);
4287 2426 [ - + ]: 2141 : if (pg_fsync(fd) != 0)
2427 : : {
4192 bruce@momjian.us 2428 :UBC 0 : int save_errno = errno;
2429 : :
3145 rhaas@postgresql.org 2430 : 0 : pgstat_report_wait_end();
4287 2431 : 0 : CloseTransientFile(fd);
17 michael@paquier.xyz 2432 : 0 : unlink(tmppath);
2041 peter@eisentraut.org 2433 : 0 : LWLockRelease(&slot->io_in_progress_lock);
2434 : :
4287 rhaas@postgresql.org 2435 : 0 : errno = save_errno;
2436 [ # # ]: 0 : ereport(elevel,
2437 : : (errcode_for_file_access(),
2438 : : errmsg("could not fsync file \"%s\": %m",
2439 : : tmppath)));
2440 : 0 : return;
2441 : : }
3145 rhaas@postgresql.org 2442 :CBC 2141 : pgstat_report_wait_end();
2443 : :
2305 peter@eisentraut.org 2444 [ - + ]: 2141 : if (CloseTransientFile(fd) != 0)
2445 : : {
2031 peter@eisentraut.org 2446 :UBC 0 : int save_errno = errno;
2447 : :
17 michael@paquier.xyz 2448 : 0 : unlink(tmppath);
2041 peter@eisentraut.org 2449 : 0 : LWLockRelease(&slot->io_in_progress_lock);
2450 : :
2031 2451 : 0 : errno = save_errno;
2424 michael@paquier.xyz 2452 [ # # ]: 0 : ereport(elevel,
2453 : : (errcode_for_file_access(),
2454 : : errmsg("could not close file \"%s\": %m",
2455 : : tmppath)));
2385 2456 : 0 : return;
2457 : : }
2458 : :
2459 : : /* rename to permanent file, fsync file and directory */
4287 rhaas@postgresql.org 2460 [ - + ]:CBC 2141 : if (rename(tmppath, path) != 0)
2461 : : {
2031 peter@eisentraut.org 2462 :UBC 0 : int save_errno = errno;
2463 : :
17 michael@paquier.xyz 2464 : 0 : unlink(tmppath);
2041 peter@eisentraut.org 2465 : 0 : LWLockRelease(&slot->io_in_progress_lock);
2466 : :
2031 2467 : 0 : errno = save_errno;
4287 rhaas@postgresql.org 2468 [ # # ]: 0 : ereport(elevel,
2469 : : (errcode_for_file_access(),
2470 : : errmsg("could not rename file \"%s\" to \"%s\": %m",
2471 : : tmppath, path)));
2472 : 0 : return;
2473 : : }
2474 : :
2475 : : /*
2476 : : * Check CreateSlotOnDisk() for the reasoning of using a critical section.
2477 : : */
4287 rhaas@postgresql.org 2478 :CBC 2141 : START_CRIT_SECTION();
2479 : :
2480 : 2141 : fsync_fname(path, false);
3519 andres@anarazel.de 2481 : 2141 : fsync_fname(dir, true);
423 michael@paquier.xyz 2482 : 2141 : fsync_fname(PG_REPLSLOT_DIR, true);
2483 : :
4287 rhaas@postgresql.org 2484 [ - + ]: 2141 : END_CRIT_SECTION();
2485 : :
2486 : : /*
2487 : : * Successfully wrote, unset dirty bit, unless somebody dirtied again
2488 : : * already and remember the confirmed_flush LSN value.
2489 : : */
3674 2490 [ - + ]: 2141 : SpinLockAcquire(&slot->mutex);
2491 [ + + ]: 2141 : if (!slot->just_dirtied)
2492 : 2131 : slot->dirty = false;
774 akapila@postgresql.o 2493 : 2141 : slot->last_saved_confirmed_flush = cp.slotdata.confirmed_flush;
135 akorotkov@postgresql 2494 : 2141 : slot->last_saved_restart_lsn = cp.slotdata.restart_lsn;
3674 rhaas@postgresql.org 2495 : 2141 : SpinLockRelease(&slot->mutex);
2496 : :
3559 2497 : 2141 : LWLockRelease(&slot->io_in_progress_lock);
2498 : : }
2499 : :
2500 : : /*
2501 : : * Load a single slot from disk into memory.
2502 : : */
2503 : : static void
4287 2504 : 98 : RestoreSlotFromDisk(const char *name)
2505 : : {
2506 : : ReplicationSlotOnDisk cp;
2507 : : int i;
2508 : : char slotdir[MAXPGPATH + sizeof(PG_REPLSLOT_DIR)];
2509 : : char path[MAXPGPATH + sizeof(PG_REPLSLOT_DIR) + 10];
2510 : : int fd;
2511 : 98 : bool restored = false;
2512 : : int readBytes;
2513 : : pg_crc32c checksum;
264 akapila@postgresql.o 2514 : 98 : TimestampTz now = 0;
2515 : :
2516 : : /* no need to lock here, no concurrent access allowed yet */
2517 : :
2518 : : /* delete temp file if it exists */
423 michael@paquier.xyz 2519 : 98 : sprintf(slotdir, "%s/%s", PG_REPLSLOT_DIR, name);
2612 2520 : 98 : sprintf(path, "%s/state.tmp", slotdir);
4287 rhaas@postgresql.org 2521 [ + - - + ]: 98 : if (unlink(path) < 0 && errno != ENOENT)
4287 rhaas@postgresql.org 2522 [ # # ]:UBC 0 : ereport(PANIC,
2523 : : (errcode_for_file_access(),
2524 : : errmsg("could not remove file \"%s\": %m", path)));
2525 : :
2612 michael@paquier.xyz 2526 :CBC 98 : sprintf(path, "%s/state", slotdir);
2527 : :
4287 rhaas@postgresql.org 2528 [ + + ]: 98 : elog(DEBUG1, "restoring replication slot from \"%s\"", path);
2529 : :
2530 : : /* on some operating systems fsyncing a file requires O_RDWR */
2215 andres@anarazel.de 2531 : 98 : fd = OpenTransientFile(path, O_RDWR | PG_BINARY);
2532 : :
2533 : : /*
2534 : : * We do not need to handle this as we are rename()ing the directory into
2535 : : * place only after we fsync()ed the state file.
2536 : : */
4287 rhaas@postgresql.org 2537 [ - + ]: 98 : if (fd < 0)
4287 rhaas@postgresql.org 2538 [ # # ]:UBC 0 : ereport(PANIC,
2539 : : (errcode_for_file_access(),
2540 : : errmsg("could not open file \"%s\": %m", path)));
2541 : :
2542 : : /*
2543 : : * Sync state file before we're reading from it. We might have crashed
2544 : : * while it wasn't synced yet and we shouldn't continue on that basis.
2545 : : */
3145 rhaas@postgresql.org 2546 :CBC 98 : pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_RESTORE_SYNC);
4287 2547 [ - + ]: 98 : if (pg_fsync(fd) != 0)
4287 rhaas@postgresql.org 2548 [ # # ]:UBC 0 : ereport(PANIC,
2549 : : (errcode_for_file_access(),
2550 : : errmsg("could not fsync file \"%s\": %m",
2551 : : path)));
3145 rhaas@postgresql.org 2552 :CBC 98 : pgstat_report_wait_end();
2553 : :
2554 : : /* Also sync the parent directory */
4287 2555 : 98 : START_CRIT_SECTION();
2612 michael@paquier.xyz 2556 : 98 : fsync_fname(slotdir, true);
4287 rhaas@postgresql.org 2557 [ - + ]: 98 : END_CRIT_SECTION();
2558 : :
2559 : : /* read part of statefile that's guaranteed to be version independent */
3145 2560 : 98 : pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_READ);
4287 2561 : 98 : readBytes = read(fd, &cp, ReplicationSlotOnDiskConstantSize);
3145 2562 : 98 : pgstat_report_wait_end();
4287 2563 [ - + ]: 98 : if (readBytes != ReplicationSlotOnDiskConstantSize)
2564 : : {
2658 michael@paquier.xyz 2565 [ # # ]:UBC 0 : if (readBytes < 0)
2566 [ # # ]: 0 : ereport(PANIC,
2567 : : (errcode_for_file_access(),
2568 : : errmsg("could not read file \"%s\": %m", path)));
2569 : : else
2570 [ # # ]: 0 : ereport(PANIC,
2571 : : (errcode(ERRCODE_DATA_CORRUPTED),
2572 : : errmsg("could not read file \"%s\": read %d of %zu",
2573 : : path, readBytes,
2574 : : (Size) ReplicationSlotOnDiskConstantSize)));
2575 : : }
2576 : :
2577 : : /* verify magic */
4287 rhaas@postgresql.org 2578 [ - + ]:CBC 98 : if (cp.magic != SLOT_MAGIC)
4287 rhaas@postgresql.org 2579 [ # # ]:UBC 0 : ereport(PANIC,
2580 : : (errcode(ERRCODE_DATA_CORRUPTED),
2581 : : errmsg("replication slot file \"%s\" has wrong magic number: %u instead of %u",
2582 : : path, cp.magic, SLOT_MAGIC)));
2583 : :
2584 : : /* verify version */
4287 rhaas@postgresql.org 2585 [ - + ]:CBC 98 : if (cp.version != SLOT_VERSION)
4287 rhaas@postgresql.org 2586 [ # # ]:UBC 0 : ereport(PANIC,
2587 : : (errcode(ERRCODE_DATA_CORRUPTED),
2588 : : errmsg("replication slot file \"%s\" has unsupported version %u",
2589 : : path, cp.version)));
2590 : :
2591 : : /* boundary check on length */
4002 andres@anarazel.de 2592 [ - + ]:CBC 98 : if (cp.length != ReplicationSlotOnDiskV2Size)
4287 rhaas@postgresql.org 2593 [ # # ]:UBC 0 : ereport(PANIC,
2594 : : (errcode(ERRCODE_DATA_CORRUPTED),
2595 : : errmsg("replication slot file \"%s\" has corrupted length %u",
2596 : : path, cp.length)));
2597 : :
2598 : : /* Now that we know the size, read the entire file */
3145 rhaas@postgresql.org 2599 :CBC 98 : pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_READ);
4287 2600 : 196 : readBytes = read(fd,
2601 : : (char *) &cp + ReplicationSlotOnDiskConstantSize,
2602 : 98 : cp.length);
3145 2603 : 98 : pgstat_report_wait_end();
4287 2604 [ - + ]: 98 : if (readBytes != cp.length)
2605 : : {
2658 michael@paquier.xyz 2606 [ # # ]:UBC 0 : if (readBytes < 0)
2607 [ # # ]: 0 : ereport(PANIC,
2608 : : (errcode_for_file_access(),
2609 : : errmsg("could not read file \"%s\": %m", path)));
2610 : : else
2611 [ # # ]: 0 : ereport(PANIC,
2612 : : (errcode(ERRCODE_DATA_CORRUPTED),
2613 : : errmsg("could not read file \"%s\": read %d of %zu",
2614 : : path, readBytes, (Size) cp.length)));
2615 : : }
2616 : :
2305 peter@eisentraut.org 2617 [ - + ]:CBC 98 : if (CloseTransientFile(fd) != 0)
2424 michael@paquier.xyz 2618 [ # # ]:UBC 0 : ereport(PANIC,
2619 : : (errcode_for_file_access(),
2620 : : errmsg("could not close file \"%s\": %m", path)));
2621 : :
2622 : : /* now verify the CRC */
4010 heikki.linnakangas@i 2623 :CBC 98 : INIT_CRC32C(checksum);
2624 : 98 : COMP_CRC32C(checksum,
2625 : : (char *) &cp + ReplicationSlotOnDiskNotChecksummedSize,
2626 : : ReplicationSlotOnDiskChecksummedSize);
4002 andres@anarazel.de 2627 : 98 : FIN_CRC32C(checksum);
2628 : :
4010 heikki.linnakangas@i 2629 [ - + ]: 98 : if (!EQ_CRC32C(checksum, cp.checksum))
4287 rhaas@postgresql.org 2630 [ # # ]:UBC 0 : ereport(PANIC,
2631 : : (errmsg("checksum mismatch for replication slot file \"%s\": is %u, should be %u",
2632 : : path, checksum, cp.checksum)));
2633 : :
2634 : : /*
2635 : : * If we crashed with an ephemeral slot active, don't restore but delete
2636 : : * it.
2637 : : */
4113 andres@anarazel.de 2638 [ - + ]:CBC 98 : if (cp.slotdata.persistency != RS_PERSISTENT)
2639 : : {
2612 michael@paquier.xyz 2640 [ # # ]:UBC 0 : if (!rmtree(slotdir, true))
2641 : : {
4113 andres@anarazel.de 2642 [ # # ]: 0 : ereport(WARNING,
2643 : : (errmsg("could not remove directory \"%s\"",
2644 : : slotdir)));
2645 : : }
423 michael@paquier.xyz 2646 : 0 : fsync_fname(PG_REPLSLOT_DIR, true);
4113 andres@anarazel.de 2647 : 0 : return;
2648 : : }
2649 : :
2650 : : /*
2651 : : * Verify that requirements for the specific slot type are met. That's
2652 : : * important because if these aren't met we're not guaranteed to retain
2653 : : * all the necessary resources for the slot.
2654 : : *
2655 : : * NB: We have to do so *after* the above checks for ephemeral slots,
2656 : : * because otherwise a slot that shouldn't exist anymore could prevent
2657 : : * restarts.
2658 : : *
2659 : : * NB: Changing the requirements here also requires adapting
2660 : : * CheckSlotRequirements() and CheckLogicalDecodingRequirements().
2661 : : */
245 msawada@postgresql.o 2662 [ + + ]:CBC 98 : if (cp.slotdata.database != InvalidOid)
2663 : : {
2664 [ - + ]: 65 : if (wal_level < WAL_LEVEL_LOGICAL)
245 msawada@postgresql.o 2665 [ # # ]:UBC 0 : ereport(FATAL,
2666 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2667 : : errmsg("logical replication slot \"%s\" exists, but \"wal_level\" < \"logical\"",
2668 : : NameStr(cp.slotdata.name)),
2669 : : errhint("Change \"wal_level\" to be \"logical\" or higher.")));
2670 : :
2671 : : /*
2672 : : * In standby mode, the hot standby must be enabled. This check is
2673 : : * necessary to ensure logical slots are invalidated when they become
2674 : : * incompatible due to insufficient wal_level. Otherwise, if the
2675 : : * primary reduces wal_level < logical while hot standby is disabled,
2676 : : * logical slots would remain valid even after promotion.
2677 : : */
245 msawada@postgresql.o 2678 [ + + - + ]:CBC 65 : if (StandbyMode && !EnableHotStandby)
245 msawada@postgresql.o 2679 [ # # ]:UBC 0 : ereport(FATAL,
2680 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2681 : : errmsg("logical replication slot \"%s\" exists on the standby, but \"hot_standby\" = \"off\"",
2682 : : NameStr(cp.slotdata.name)),
2683 : : errhint("Change \"hot_standby\" to be \"on\".")));
2684 : : }
2553 andres@anarazel.de 2685 [ - + ]:CBC 33 : else if (wal_level < WAL_LEVEL_REPLICA)
2553 andres@anarazel.de 2686 [ # # ]:UBC 0 : ereport(FATAL,
2687 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2688 : : errmsg("physical replication slot \"%s\" exists, but \"wal_level\" < \"replica\"",
2689 : : NameStr(cp.slotdata.name)),
2690 : : errhint("Change \"wal_level\" to be \"replica\" or higher.")));
2691 : :
2692 : : /* nothing can be active yet, don't lock anything */
4287 rhaas@postgresql.org 2693 [ + - ]:CBC 143 : for (i = 0; i < max_replication_slots; i++)
2694 : : {
2695 : : ReplicationSlot *slot;
2696 : :
2697 : 143 : slot = &ReplicationSlotCtl->replication_slots[i];
2698 : :
2699 [ + + ]: 143 : if (slot->in_use)
2700 : 45 : continue;
2701 : :
2702 : : /* restore the entire set of persistent data */
2703 : 98 : memcpy(&slot->data, &cp.slotdata,
2704 : : sizeof(ReplicationSlotPersistentData));
2705 : :
2706 : : /* initialize in memory state */
2707 : 98 : slot->effective_xmin = cp.slotdata.xmin;
4256 2708 : 98 : slot->effective_catalog_xmin = cp.slotdata.catalog_xmin;
774 akapila@postgresql.o 2709 : 98 : slot->last_saved_confirmed_flush = cp.slotdata.confirmed_flush;
135 akorotkov@postgresql 2710 : 98 : slot->last_saved_restart_lsn = cp.slotdata.restart_lsn;
2711 : :
4256 rhaas@postgresql.org 2712 : 98 : slot->candidate_catalog_xmin = InvalidTransactionId;
2713 : 98 : slot->candidate_xmin_lsn = InvalidXLogRecPtr;
2714 : 98 : slot->candidate_restart_lsn = InvalidXLogRecPtr;
2715 : 98 : slot->candidate_restart_valid = InvalidXLogRecPtr;
2716 : :
4287 2717 : 98 : slot->in_use = true;
3842 andres@anarazel.de 2718 : 98 : slot->active_pid = 0;
2719 : :
2720 : : /*
2721 : : * Set the time since the slot has become inactive after loading the
2722 : : * slot from the disk into memory. Whoever acquires the slot i.e.
2723 : : * makes the slot active will reset it. Use the same inactive_since
2724 : : * time for all the slots.
2725 : : */
264 akapila@postgresql.o 2726 [ + - ]: 98 : if (now == 0)
2727 : 98 : now = GetCurrentTimestamp();
2728 : :
2729 : 98 : ReplicationSlotSetInactiveSince(slot, now, false);
2730 : :
4287 rhaas@postgresql.org 2731 : 98 : restored = true;
2732 : 98 : break;
2733 : : }
2734 : :
2735 [ - + ]: 98 : if (!restored)
2551 michael@paquier.xyz 2736 [ # # ]:UBC 0 : ereport(FATAL,
2737 : : (errmsg("too many replication slots active before shutdown"),
2738 : : errhint("Increase \"max_replication_slots\" and try again.")));
2739 : : }
2740 : :
2741 : : /*
2742 : : * Maps an invalidation reason for a replication slot to
2743 : : * ReplicationSlotInvalidationCause.
2744 : : */
2745 : : ReplicationSlotInvalidationCause
250 akapila@postgresql.o 2746 : 0 : GetSlotInvalidationCause(const char *cause_name)
2747 : : {
2748 [ # # ]: 0 : Assert(cause_name);
2749 : :
2750 : : /* Search lookup table for the cause having this name */
2751 [ # # ]: 0 : for (int i = 0; i <= RS_INVAL_MAX_CAUSES; i++)
2752 : : {
2753 [ # # ]: 0 : if (strcmp(SlotInvalidationCauses[i].cause_name, cause_name) == 0)
2754 : 0 : return SlotInvalidationCauses[i].cause;
2755 : : }
2756 : :
2757 : 0 : Assert(false);
2758 : : return RS_INVAL_NONE; /* to keep compiler quiet */
2759 : : }
2760 : :
2761 : : /*
2762 : : * Maps an ReplicationSlotInvalidationCause to the invalidation
2763 : : * reason for a replication slot.
2764 : : */
2765 : : const char *
250 akapila@postgresql.o 2766 :CBC 4 : GetSlotInvalidationCauseName(ReplicationSlotInvalidationCause cause)
2767 : : {
2768 : : /* Search lookup table for the name of this cause */
2769 [ + - ]: 8 : for (int i = 0; i <= RS_INVAL_MAX_CAUSES; i++)
2770 : : {
2771 [ + + ]: 8 : if (SlotInvalidationCauses[i].cause == cause)
2772 : 4 : return SlotInvalidationCauses[i].cause_name;
2773 : : }
2774 : :
250 akapila@postgresql.o 2775 :UBC 0 : Assert(false);
2776 : : return "none"; /* to keep compiler quiet */
2777 : : }
2778 : :
2779 : : /*
2780 : : * A helper function to validate slots specified in GUC synchronized_standby_slots.
2781 : : *
2782 : : * The rawname will be parsed, and the result will be saved into *elemlist.
2783 : : */
2784 : : static bool
483 akapila@postgresql.o 2785 :CBC 9 : validate_sync_standby_slots(char *rawname, List **elemlist)
2786 : : {
2787 : : bool ok;
2788 : :
2789 : : /* Verify syntax and parse string into a list of identifiers */
598 2790 : 9 : ok = SplitIdentifierString(rawname, ',', elemlist);
2791 : :
2792 [ - + ]: 9 : if (!ok)
2793 : : {
598 akapila@postgresql.o 2794 :UBC 0 : GUC_check_errdetail("List syntax is invalid.");
2795 : : }
328 alvherre@alvh.no-ip. 2796 [ + + ]:CBC 9 : else if (MyProc)
2797 : : {
2798 : : /*
2799 : : * Check that each specified slot exist and is physical.
2800 : : *
2801 : : * Because we need an LWLock, we cannot do this on processes without a
2802 : : * PGPROC, so we skip it there; but see comments in
2803 : : * StandbySlotsHaveCaughtup() as to why that's not a problem.
2804 : : */
598 akapila@postgresql.o 2805 : 5 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
2806 : :
2807 [ + - + + : 15 : foreach_ptr(char, name, *elemlist)
+ + ]
2808 : : {
2809 : : ReplicationSlot *slot;
2810 : :
2811 : 5 : slot = SearchNamedReplicationSlot(name, false);
2812 : :
2813 [ - + ]: 5 : if (!slot)
2814 : : {
334 alvherre@alvh.no-ip. 2815 :UBC 0 : GUC_check_errdetail("Replication slot \"%s\" does not exist.",
2816 : : name);
598 akapila@postgresql.o 2817 : 0 : ok = false;
2818 : 0 : break;
2819 : : }
2820 : :
598 akapila@postgresql.o 2821 [ - + ]:CBC 5 : if (!SlotIsPhysical(slot))
2822 : : {
334 alvherre@alvh.no-ip. 2823 :UBC 0 : GUC_check_errdetail("\"%s\" is not a physical replication slot.",
2824 : : name);
598 akapila@postgresql.o 2825 : 0 : ok = false;
2826 : 0 : break;
2827 : : }
2828 : : }
2829 : :
598 akapila@postgresql.o 2830 :CBC 5 : LWLockRelease(ReplicationSlotControlLock);
2831 : : }
2832 : :
2833 : 9 : return ok;
2834 : : }
2835 : :
2836 : : /*
2837 : : * GUC check_hook for synchronized_standby_slots
2838 : : */
2839 : : bool
483 2840 : 1103 : check_synchronized_standby_slots(char **newval, void **extra, GucSource source)
2841 : : {
2842 : : char *rawname;
2843 : : char *ptr;
2844 : : List *elemlist;
2845 : : int size;
2846 : : bool ok;
2847 : : SyncStandbySlotsConfigData *config;
2848 : :
598 2849 [ + + ]: 1103 : if ((*newval)[0] == '\0')
2850 : 1094 : return true;
2851 : :
2852 : : /* Need a modifiable copy of the GUC string */
2853 : 9 : rawname = pstrdup(*newval);
2854 : :
2855 : : /* Now verify if the specified slots exist and have correct type */
483 2856 : 9 : ok = validate_sync_standby_slots(rawname, &elemlist);
2857 : :
598 2858 [ + - - + ]: 9 : if (!ok || elemlist == NIL)
2859 : : {
598 akapila@postgresql.o 2860 :UBC 0 : pfree(rawname);
2861 : 0 : list_free(elemlist);
2862 : 0 : return ok;
2863 : : }
2864 : :
2865 : : /* Compute the size required for the SyncStandbySlotsConfigData struct */
483 akapila@postgresql.o 2866 :CBC 9 : size = offsetof(SyncStandbySlotsConfigData, slot_names);
598 2867 [ + - + + : 27 : foreach_ptr(char, slot_name, elemlist)
+ + ]
2868 : 9 : size += strlen(slot_name) + 1;
2869 : :
2870 : : /* GUC extra value must be guc_malloc'd, not palloc'd */
483 2871 : 9 : config = (SyncStandbySlotsConfigData *) guc_malloc(LOG, size);
214 dgustafsson@postgres 2872 [ - + ]: 9 : if (!config)
214 dgustafsson@postgres 2873 :UBC 0 : return false;
2874 : :
2875 : : /* Transform the data into SyncStandbySlotsConfigData */
598 akapila@postgresql.o 2876 :CBC 9 : config->nslotnames = list_length(elemlist);
2877 : :
2878 : 9 : ptr = config->slot_names;
2879 [ + - + + : 27 : foreach_ptr(char, slot_name, elemlist)
+ + ]
2880 : : {
2881 : 9 : strcpy(ptr, slot_name);
2882 : 9 : ptr += strlen(slot_name) + 1;
2883 : : }
2884 : :
333 peter@eisentraut.org 2885 : 9 : *extra = config;
2886 : :
598 akapila@postgresql.o 2887 : 9 : pfree(rawname);
2888 : 9 : list_free(elemlist);
2889 : 9 : return true;
2890 : : }
2891 : :
2892 : : /*
2893 : : * GUC assign_hook for synchronized_standby_slots
2894 : : */
2895 : : void
483 2896 : 1103 : assign_synchronized_standby_slots(const char *newval, void *extra)
2897 : : {
2898 : : /*
2899 : : * The standby slots may have changed, so we must recompute the oldest
2900 : : * LSN.
2901 : : */
598 2902 : 1103 : ss_oldest_flush_lsn = InvalidXLogRecPtr;
2903 : :
483 2904 : 1103 : synchronized_standby_slots_config = (SyncStandbySlotsConfigData *) extra;
598 2905 : 1103 : }
2906 : :
2907 : : /*
2908 : : * Check if the passed slot_name is specified in the synchronized_standby_slots GUC.
2909 : : */
2910 : : bool
483 2911 : 38700 : SlotExistsInSyncStandbySlots(const char *slot_name)
2912 : : {
2913 : : const char *standby_slot_name;
2914 : :
2915 : : /* Return false if there is no value in synchronized_standby_slots */
2916 [ + + ]: 38700 : if (synchronized_standby_slots_config == NULL)
598 2917 : 38694 : return false;
2918 : :
2919 : : /*
2920 : : * XXX: We are not expecting this list to be long so a linear search
2921 : : * shouldn't hurt but if that turns out not to be true then we can cache
2922 : : * this information for each WalSender as well.
2923 : : */
483 2924 : 6 : standby_slot_name = synchronized_standby_slots_config->slot_names;
2925 [ + - ]: 6 : for (int i = 0; i < synchronized_standby_slots_config->nslotnames; i++)
2926 : : {
598 2927 [ + - ]: 6 : if (strcmp(standby_slot_name, slot_name) == 0)
2928 : 6 : return true;
2929 : :
598 akapila@postgresql.o 2930 :UBC 0 : standby_slot_name += strlen(standby_slot_name) + 1;
2931 : : }
2932 : :
2933 : 0 : return false;
2934 : : }
2935 : :
2936 : : /*
2937 : : * Return true if the slots specified in synchronized_standby_slots have caught up to
2938 : : * the given WAL location, false otherwise.
2939 : : *
2940 : : * The elevel parameter specifies the error level used for logging messages
2941 : : * related to slots that do not exist, are invalidated, or are inactive.
2942 : : */
2943 : : bool
598 akapila@postgresql.o 2944 :CBC 646 : StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
2945 : : {
2946 : : const char *name;
2947 : 646 : int caught_up_slot_num = 0;
2948 : 646 : XLogRecPtr min_restart_lsn = InvalidXLogRecPtr;
2949 : :
2950 : : /*
2951 : : * Don't need to wait for the standbys to catch up if there is no value in
2952 : : * synchronized_standby_slots.
2953 : : */
483 2954 [ + + ]: 646 : if (synchronized_standby_slots_config == NULL)
598 2955 : 623 : return true;
2956 : :
2957 : : /*
2958 : : * Don't need to wait for the standbys to catch up if we are on a standby
2959 : : * server, since we do not support syncing slots to cascading standbys.
2960 : : */
2961 [ - + ]: 23 : if (RecoveryInProgress())
598 akapila@postgresql.o 2962 :UBC 0 : return true;
2963 : :
2964 : : /*
2965 : : * Don't need to wait for the standbys to catch up if they are already
2966 : : * beyond the specified WAL location.
2967 : : */
598 akapila@postgresql.o 2968 [ + + ]:CBC 23 : if (!XLogRecPtrIsInvalid(ss_oldest_flush_lsn) &&
2969 [ + + ]: 11 : ss_oldest_flush_lsn >= wait_for_lsn)
2970 : 7 : return true;
2971 : :
2972 : : /*
2973 : : * To prevent concurrent slot dropping and creation while filtering the
2974 : : * slots, take the ReplicationSlotControlLock outside of the loop.
2975 : : */
2976 : 16 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
2977 : :
483 2978 : 16 : name = synchronized_standby_slots_config->slot_names;
2979 [ + + ]: 21 : for (int i = 0; i < synchronized_standby_slots_config->nslotnames; i++)
2980 : : {
2981 : : XLogRecPtr restart_lsn;
2982 : : bool invalidated;
2983 : : bool inactive;
2984 : : ReplicationSlot *slot;
2985 : :
598 2986 : 16 : slot = SearchNamedReplicationSlot(name, false);
2987 : :
2988 : : /*
2989 : : * If a slot name provided in synchronized_standby_slots does not
2990 : : * exist, report a message and exit the loop.
2991 : : *
2992 : : * Though validate_sync_standby_slots (the GUC check_hook) tries to
2993 : : * avoid this, it can nonetheless happen because the user can specify
2994 : : * a nonexistent slot name before server startup. That function cannot
2995 : : * validate such a slot during startup, as ReplicationSlotCtl is not
2996 : : * initialized by then. Also, the user might have dropped one slot.
2997 : : */
2998 [ - + ]: 16 : if (!slot)
2999 : : {
598 akapila@postgresql.o 3000 [ # # ]:UBC 0 : ereport(elevel,
3001 : : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
3002 : : errmsg("replication slot \"%s\" specified in parameter \"%s\" does not exist",
3003 : : name, "synchronized_standby_slots"),
3004 : : errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
3005 : : name),
3006 : : errhint("Create the replication slot \"%s\" or amend parameter \"%s\".",
3007 : : name, "synchronized_standby_slots"));
3008 : 0 : break;
3009 : : }
3010 : :
3011 : : /* Same as above: if a slot is not physical, exit the loop. */
598 akapila@postgresql.o 3012 [ - + ]:CBC 16 : if (SlotIsLogical(slot))
3013 : : {
598 akapila@postgresql.o 3014 [ # # ]:UBC 0 : ereport(elevel,
3015 : : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
3016 : : errmsg("cannot specify logical replication slot \"%s\" in parameter \"%s\"",
3017 : : name, "synchronized_standby_slots"),
3018 : : errdetail("Logical replication is waiting for correction on replication slot \"%s\".",
3019 : : name),
3020 : : errhint("Remove the logical replication slot \"%s\" from parameter \"%s\".",
3021 : : name, "synchronized_standby_slots"));
3022 : 0 : break;
3023 : : }
3024 : :
598 akapila@postgresql.o 3025 [ - + ]:CBC 16 : SpinLockAcquire(&slot->mutex);
3026 : 16 : restart_lsn = slot->data.restart_lsn;
3027 : 16 : invalidated = slot->data.invalidated != RS_INVAL_NONE;
3028 : 16 : inactive = slot->active_pid == 0;
3029 : 16 : SpinLockRelease(&slot->mutex);
3030 : :
3031 [ - + ]: 16 : if (invalidated)
3032 : : {
3033 : : /* Specified physical slot has been invalidated */
598 akapila@postgresql.o 3034 [ # # ]:UBC 0 : ereport(elevel,
3035 : : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
3036 : : errmsg("physical replication slot \"%s\" specified in parameter \"%s\" has been invalidated",
3037 : : name, "synchronized_standby_slots"),
3038 : : errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
3039 : : name),
3040 : : errhint("Drop and recreate the replication slot \"%s\", or amend parameter \"%s\".",
3041 : : name, "synchronized_standby_slots"));
3042 : 0 : break;
3043 : : }
3044 : :
598 akapila@postgresql.o 3045 [ + + + + ]:CBC 16 : if (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn < wait_for_lsn)
3046 : : {
3047 : : /* Log a message if no active_pid for this physical slot */
3048 [ + + ]: 11 : if (inactive)
3049 [ + - ]: 9 : ereport(elevel,
3050 : : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
3051 : : errmsg("replication slot \"%s\" specified in parameter \"%s\" does not have active_pid",
3052 : : name, "synchronized_standby_slots"),
3053 : : errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
3054 : : name),
3055 : : errhint("Start the standby associated with the replication slot \"%s\", or amend parameter \"%s\".",
3056 : : name, "synchronized_standby_slots"));
3057 : :
3058 : : /* Continue if the current slot hasn't caught up. */
3059 : 11 : break;
3060 : : }
3061 : :
3062 [ - + ]: 5 : Assert(restart_lsn >= wait_for_lsn);
3063 : :
3064 [ - + - - ]: 5 : if (XLogRecPtrIsInvalid(min_restart_lsn) ||
3065 : : min_restart_lsn > restart_lsn)
3066 : 5 : min_restart_lsn = restart_lsn;
3067 : :
3068 : 5 : caught_up_slot_num++;
3069 : :
3070 : 5 : name += strlen(name) + 1;
3071 : : }
3072 : :
3073 : 16 : LWLockRelease(ReplicationSlotControlLock);
3074 : :
3075 : : /*
3076 : : * Return false if not all the standbys have caught up to the specified
3077 : : * WAL location.
3078 : : */
483 3079 [ + + ]: 16 : if (caught_up_slot_num != synchronized_standby_slots_config->nslotnames)
598 3080 : 11 : return false;
3081 : :
3082 : : /* The ss_oldest_flush_lsn must not retreat. */
3083 [ + + - + ]: 5 : Assert(XLogRecPtrIsInvalid(ss_oldest_flush_lsn) ||
3084 : : min_restart_lsn >= ss_oldest_flush_lsn);
3085 : :
3086 : 5 : ss_oldest_flush_lsn = min_restart_lsn;
3087 : :
3088 : 5 : return true;
3089 : : }
3090 : :
3091 : : /*
3092 : : * Wait for physical standbys to confirm receiving the given lsn.
3093 : : *
3094 : : * Used by logical decoding SQL functions. It waits for physical standbys
3095 : : * corresponding to the physical slots specified in the synchronized_standby_slots GUC.
3096 : : */
3097 : : void
3098 : 218 : WaitForStandbyConfirmation(XLogRecPtr wait_for_lsn)
3099 : : {
3100 : : /*
3101 : : * Don't need to wait for the standby to catch up if the current acquired
3102 : : * slot is not a logical failover slot, or there is no value in
3103 : : * synchronized_standby_slots.
3104 : : */
483 3105 [ + + + + ]: 218 : if (!MyReplicationSlot->data.failover || !synchronized_standby_slots_config)
598 3106 : 217 : return;
3107 : :
3108 : 1 : ConditionVariablePrepareToSleep(&WalSndCtl->wal_confirm_rcv_cv);
3109 : :
3110 : : for (;;)
3111 : : {
3112 [ - + ]: 2 : CHECK_FOR_INTERRUPTS();
3113 : :
3114 [ + + ]: 2 : if (ConfigReloadPending)
3115 : : {
3116 : 1 : ConfigReloadPending = false;
3117 : 1 : ProcessConfigFile(PGC_SIGHUP);
3118 : : }
3119 : :
3120 : : /* Exit if done waiting for every slot. */
3121 [ + + ]: 2 : if (StandbySlotsHaveCaughtup(wait_for_lsn, WARNING))
3122 : 1 : break;
3123 : :
3124 : : /*
3125 : : * Wait for the slots in the synchronized_standby_slots to catch up,
3126 : : * but use a timeout (1s) so we can also check if the
3127 : : * synchronized_standby_slots has been changed.
3128 : : */
3129 : 1 : ConditionVariableTimedSleep(&WalSndCtl->wal_confirm_rcv_cv, 1000,
3130 : : WAIT_EVENT_WAIT_FOR_STANDBY_CONFIRMATION);
3131 : : }
3132 : :
3133 : 1 : ConditionVariableCancelSleep();
3134 : : }
|