Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : * slot.h
3 : : * Replication slot management.
4 : : *
5 : : * Copyright (c) 2012-2025, PostgreSQL Global Development Group
6 : : *
7 : : *-------------------------------------------------------------------------
8 : : */
9 : : #ifndef SLOT_H
10 : : #define SLOT_H
11 : :
12 : : #include "access/xlog.h"
13 : : #include "access/xlogreader.h"
14 : : #include "storage/condition_variable.h"
15 : : #include "storage/lwlock.h"
16 : : #include "storage/shmem.h"
17 : : #include "storage/spin.h"
18 : : #include "replication/walreceiver.h"
19 : :
20 : : /* directory to store replication slot data in */
21 : : #define PG_REPLSLOT_DIR "pg_replslot"
22 : :
23 : : /*
24 : : * The reserved name for a replication slot used to retain dead tuples for
25 : : * conflict detection in logical replication. See
26 : : * maybe_advance_nonremovable_xid() for detail.
27 : : */
28 : : #define CONFLICT_DETECTION_SLOT "pg_conflict_detection"
29 : :
30 : : /*
31 : : * Behaviour of replication slots, upon release or crash.
32 : : *
33 : : * Slots marked as PERSISTENT are crash-safe and will not be dropped when
34 : : * released. Slots marked as EPHEMERAL will be dropped when released or after
35 : : * restarts. Slots marked TEMPORARY will be dropped at the end of a session
36 : : * or on error.
37 : : *
38 : : * EPHEMERAL is used as a not-quite-ready state when creating persistent
39 : : * slots. EPHEMERAL slots can be made PERSISTENT by calling
40 : : * ReplicationSlotPersist(). For a slot that goes away at the end of a
41 : : * session, TEMPORARY is the appropriate choice.
42 : : */
43 : : typedef enum ReplicationSlotPersistency
44 : : {
45 : : RS_PERSISTENT,
46 : : RS_EPHEMERAL,
47 : : RS_TEMPORARY,
48 : : } ReplicationSlotPersistency;
49 : :
50 : : /*
51 : : * Slots can be invalidated, e.g. due to max_slot_wal_keep_size. If so, the
52 : : * 'invalidated' field is set to a value other than _NONE.
53 : : *
54 : : * When adding a new invalidation cause here, the value must be powers of 2
55 : : * (e.g., 1, 2, 4...) for proper bitwise operations. Also, remember to update
56 : : * RS_INVAL_MAX_CAUSES below, and SlotInvalidationCauses in slot.c.
57 : : */
58 : : typedef enum ReplicationSlotInvalidationCause
59 : : {
60 : : RS_INVAL_NONE = 0,
61 : : /* required WAL has been removed */
62 : : RS_INVAL_WAL_REMOVED = (1 << 0),
63 : : /* required rows have been removed */
64 : : RS_INVAL_HORIZON = (1 << 1),
65 : : /* wal_level insufficient for slot */
66 : : RS_INVAL_WAL_LEVEL = (1 << 2),
67 : : /* idle slot timeout has occurred */
68 : : RS_INVAL_IDLE_TIMEOUT = (1 << 3),
69 : : } ReplicationSlotInvalidationCause;
70 : :
71 : : /* Maximum number of invalidation causes */
72 : : #define RS_INVAL_MAX_CAUSES 4
73 : :
74 : : /*
75 : : * When the slot synchronization worker is running, or when
76 : : * pg_sync_replication_slots is executed, slot synchronization may be
77 : : * skipped. This enum defines the possible reasons for skipping slot
78 : : * synchronization.
79 : : */
80 : : typedef enum SlotSyncSkipReason
81 : : {
82 : : SS_SKIP_NONE, /* No skip */
83 : : SS_SKIP_WAL_NOT_FLUSHED, /* Standby did not flush the wal corresponding
84 : : * to confirmed flush of remote slot */
85 : : SS_SKIP_WAL_OR_ROWS_REMOVED, /* Remote slot is behind; required WAL or
86 : : * rows may be removed or at risk */
87 : : SS_SKIP_NO_CONSISTENT_SNAPSHOT, /* Standby could not build a consistent
88 : : * snapshot */
89 : : SS_SKIP_INVALID /* Local slot is invalid */
90 : : } SlotSyncSkipReason;
91 : :
92 : : /*
93 : : * On-Disk data of a replication slot, preserved across restarts.
94 : : */
95 : : typedef struct ReplicationSlotPersistentData
96 : : {
97 : : /* The slot's identifier */
98 : : NameData name;
99 : :
100 : : /* database the slot is active on */
101 : : Oid database;
102 : :
103 : : /*
104 : : * The slot's behaviour when being dropped (or restored after a crash).
105 : : */
106 : : ReplicationSlotPersistency persistency;
107 : :
108 : : /*
109 : : * xmin horizon for data
110 : : *
111 : : * NB: This may represent a value that hasn't been written to disk yet;
112 : : * see notes for effective_xmin, below.
113 : : */
114 : : TransactionId xmin;
115 : :
116 : : /*
117 : : * xmin horizon for catalog tuples
118 : : *
119 : : * NB: This may represent a value that hasn't been written to disk yet;
120 : : * see notes for effective_xmin, below.
121 : : */
122 : : TransactionId catalog_xmin;
123 : :
124 : : /* oldest LSN that might be required by this replication slot */
125 : : XLogRecPtr restart_lsn;
126 : :
127 : : /* RS_INVAL_NONE if valid, or the reason for having been invalidated */
128 : : ReplicationSlotInvalidationCause invalidated;
129 : :
130 : : /*
131 : : * Oldest LSN that the client has acked receipt for. This is used as the
132 : : * start_lsn point in case the client doesn't specify one, and also as a
133 : : * safety measure to jump forwards in case the client specifies a
134 : : * start_lsn that's further in the past than this value.
135 : : */
136 : : XLogRecPtr confirmed_flush;
137 : :
138 : : /*
139 : : * LSN at which we enabled two_phase commit for this slot or LSN at which
140 : : * we found a consistent point at the time of slot creation.
141 : : */
142 : : XLogRecPtr two_phase_at;
143 : :
144 : : /*
145 : : * Allow decoding of prepared transactions?
146 : : */
147 : : bool two_phase;
148 : :
149 : : /* plugin name */
150 : : NameData plugin;
151 : :
152 : : /*
153 : : * Was this slot synchronized from the primary server?
154 : : */
155 : : bool synced;
156 : :
157 : : /*
158 : : * Is this a failover slot (sync candidate for standbys)? Only relevant
159 : : * for logical slots on the primary server.
160 : : */
161 : : bool failover;
162 : : } ReplicationSlotPersistentData;
163 : :
164 : : /*
165 : : * Shared memory state of a single replication slot.
166 : : *
167 : : * The in-memory data of replication slots follows a locking model based
168 : : * on two linked concepts:
169 : : * - A replication slot's in_use flag is switched when added or discarded using
170 : : * the LWLock ReplicationSlotControlLock, which needs to be hold in exclusive
171 : : * mode when updating the flag by the backend owning the slot and doing the
172 : : * operation, while readers (concurrent backends not owning the slot) need
173 : : * to hold it in shared mode when looking at replication slot data.
174 : : * - Individual fields are protected by mutex where only the backend owning
175 : : * the slot is authorized to update the fields from its own slot. The
176 : : * backend owning the slot does not need to take this lock when reading its
177 : : * own fields, while concurrent backends not owning this slot should take the
178 : : * lock when reading this slot's data.
179 : : */
180 : : typedef struct ReplicationSlot
181 : : {
182 : : /* lock, on same cacheline as effective_xmin */
183 : : slock_t mutex;
184 : :
185 : : /* is this slot defined */
186 : : bool in_use;
187 : :
188 : : /* Who is streaming out changes for this slot? 0 in unused slots. */
189 : : pid_t active_pid;
190 : :
191 : : /* any outstanding modifications? */
192 : : bool just_dirtied;
193 : : bool dirty;
194 : :
195 : : /*
196 : : * For logical decoding, it's extremely important that we never remove any
197 : : * data that's still needed for decoding purposes, even after a crash;
198 : : * otherwise, decoding will produce wrong answers. Ordinary streaming
199 : : * replication also needs to prevent old row versions from being removed
200 : : * too soon, but the worst consequence we might encounter there is
201 : : * unwanted query cancellations on the standby. Thus, for logical
202 : : * decoding, this value represents the latest xmin that has actually been
203 : : * written to disk, whereas for streaming replication, it's just the same
204 : : * as the persistent value (data.xmin).
205 : : */
206 : : TransactionId effective_xmin;
207 : : TransactionId effective_catalog_xmin;
208 : :
209 : : /* data surviving shutdowns and crashes */
210 : : ReplicationSlotPersistentData data;
211 : :
212 : : /* is somebody performing io on this slot? */
213 : : LWLock io_in_progress_lock;
214 : :
215 : : /* Condition variable signaled when active_pid changes */
216 : : ConditionVariable active_cv;
217 : :
218 : : /* all the remaining data is only used for logical slots */
219 : :
220 : : /*
221 : : * When the client has confirmed flushes >= candidate_xmin_lsn we can
222 : : * advance the catalog xmin. When restart_valid has been passed,
223 : : * restart_lsn can be increased.
224 : : */
225 : : TransactionId candidate_catalog_xmin;
226 : : XLogRecPtr candidate_xmin_lsn;
227 : : XLogRecPtr candidate_restart_valid;
228 : : XLogRecPtr candidate_restart_lsn;
229 : :
230 : : /*
231 : : * This value tracks the last confirmed_flush LSN flushed which is used
232 : : * during a shutdown checkpoint to decide if logical's slot data should be
233 : : * forcibly flushed or not.
234 : : */
235 : : XLogRecPtr last_saved_confirmed_flush;
236 : :
237 : : /*
238 : : * The time when the slot became inactive. For synced slots on a standby
239 : : * server, it represents the time when slot synchronization was most
240 : : * recently stopped.
241 : : */
242 : : TimestampTz inactive_since;
243 : :
244 : : /*
245 : : * Latest restart_lsn that has been flushed to disk. For persistent slots
246 : : * the flushed LSN should be taken into account when calculating the
247 : : * oldest LSN for WAL segments removal.
248 : : *
249 : : * Do not assume that restart_lsn will always move forward, i.e., that the
250 : : * previously flushed restart_lsn is always behind data.restart_lsn. In
251 : : * streaming replication using a physical slot, the restart_lsn is updated
252 : : * based on the flushed WAL position reported by the walreceiver.
253 : : *
254 : : * This replication mode allows duplicate WAL records to be received and
255 : : * overwritten. If the walreceiver receives older WAL records and then
256 : : * reports them as flushed to the walsender, the restart_lsn may appear to
257 : : * move backward.
258 : : *
259 : : * This typically occurs at the beginning of replication. One reason is
260 : : * that streaming replication starts at the beginning of a segment, so, if
261 : : * restart_lsn is in the middle of a segment, it will be updated to an
262 : : * earlier LSN, see RequestXLogStreaming. Another reason is that the
263 : : * walreceiver chooses its startpoint based on the replayed LSN, so, if
264 : : * some records have been received but not yet applied, they will be
265 : : * received again and leads to updating the restart_lsn to an earlier
266 : : * position.
267 : : */
268 : : XLogRecPtr last_saved_restart_lsn;
269 : :
270 : : /*
271 : : * Reason for the most recent slot synchronization skip.
272 : : *
273 : : * Slot sync skips can occur for both temporary and persistent replication
274 : : * slots. They are more common for temporary slots, but persistent slots
275 : : * may also skip synchronization in rare cases (e.g.,
276 : : * SS_SKIP_WAL_NOT_FLUSHED or SS_SKIP_WAL_OR_ROWS_REMOVED).
277 : : *
278 : : * Since, temporary slots are dropped after server restart, persisting
279 : : * slotsync_skip_reason provides no practical benefit.
280 : : */
281 : : SlotSyncSkipReason slotsync_skip_reason;
282 : : } ReplicationSlot;
283 : :
284 : : #define SlotIsPhysical(slot) ((slot)->data.database == InvalidOid)
285 : : #define SlotIsLogical(slot) ((slot)->data.database != InvalidOid)
286 : :
287 : : /*
288 : : * Shared memory control area for all of replication slots.
289 : : */
290 : : typedef struct ReplicationSlotCtlData
291 : : {
292 : : /*
293 : : * This array should be declared [FLEXIBLE_ARRAY_MEMBER], but for some
294 : : * reason you can't do that in an otherwise-empty struct.
295 : : */
296 : : ReplicationSlot replication_slots[1];
297 : : } ReplicationSlotCtlData;
298 : :
299 : : /*
300 : : * Set slot's inactive_since property unless it was previously invalidated.
301 : : */
302 : : static inline void
314 akapila@postgresql.o 303 :CBC 2799 : ReplicationSlotSetInactiveSince(ReplicationSlot *s, TimestampTz ts,
304 : : bool acquire_lock)
305 : : {
306 [ + + ]: 2799 : if (acquire_lock)
307 [ - + ]: 292 : SpinLockAcquire(&s->mutex);
308 : :
309 [ + + ]: 2799 : if (s->data.invalidated == RS_INVAL_NONE)
310 : 2792 : s->inactive_since = ts;
311 : :
312 [ + + ]: 2799 : if (acquire_lock)
313 : 292 : SpinLockRelease(&s->mutex);
314 : 2799 : }
315 : :
316 : : /*
317 : : * Pointers to shared memory
318 : : */
319 : : extern PGDLLIMPORT ReplicationSlotCtlData *ReplicationSlotCtl;
320 : : extern PGDLLIMPORT ReplicationSlot *MyReplicationSlot;
321 : :
322 : : /* GUCs */
323 : : extern PGDLLIMPORT int max_replication_slots;
324 : : extern PGDLLIMPORT char *synchronized_standby_slots;
325 : : extern PGDLLIMPORT int idle_replication_slot_timeout_secs;
326 : :
327 : : /* shmem initialization functions */
328 : : extern Size ReplicationSlotsShmemSize(void);
329 : : extern void ReplicationSlotsShmemInit(void);
330 : :
331 : : /* management of individual slots */
332 : : extern void ReplicationSlotCreate(const char *name, bool db_specific,
333 : : ReplicationSlotPersistency persistency,
334 : : bool two_phase, bool failover,
335 : : bool synced);
336 : : extern void ReplicationSlotPersist(void);
337 : : extern void ReplicationSlotDrop(const char *name, bool nowait);
338 : : extern void ReplicationSlotDropAcquired(void);
339 : : extern void ReplicationSlotAlter(const char *name, const bool *failover,
340 : : const bool *two_phase);
341 : :
342 : : extern void ReplicationSlotAcquire(const char *name, bool nowait,
343 : : bool error_if_invalid);
344 : : extern void ReplicationSlotRelease(void);
345 : : extern void ReplicationSlotCleanup(bool synced_only);
346 : : extern void ReplicationSlotSave(void);
347 : : extern void ReplicationSlotMarkDirty(void);
348 : :
349 : : /* misc stuff */
350 : : extern void ReplicationSlotInitialize(void);
351 : : extern bool ReplicationSlotValidateName(const char *name,
352 : : bool allow_reserved_name,
353 : : int elevel);
354 : : extern bool ReplicationSlotValidateNameInternal(const char *name,
355 : : bool allow_reserved_name,
356 : : int *err_code, char **err_msg, char **err_hint);
357 : : extern void ReplicationSlotReserveWal(void);
358 : : extern void ReplicationSlotsComputeRequiredXmin(bool already_locked);
359 : : extern void ReplicationSlotsComputeRequiredLSN(void);
360 : : extern XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void);
361 : : extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive);
362 : : extern void ReplicationSlotsDropDBSlots(Oid dboid);
363 : : extern bool InvalidateObsoleteReplicationSlots(uint32 possible_causes,
364 : : XLogSegNo oldestSegno,
365 : : Oid dboid,
366 : : TransactionId snapshotConflictHorizon);
367 : : extern ReplicationSlot *SearchNamedReplicationSlot(const char *name, bool need_lock);
368 : : extern int ReplicationSlotIndex(ReplicationSlot *slot);
369 : : extern bool ReplicationSlotName(int index, Name name);
370 : : extern void ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char *syncslotname, Size szslot);
371 : : extern void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok);
372 : :
373 : : extern void StartupReplicationSlots(void);
374 : : extern void CheckPointReplicationSlots(bool is_shutdown);
375 : :
376 : : extern void CheckSlotRequirements(void);
377 : : extern void CheckSlotPermissions(void);
378 : : extern ReplicationSlotInvalidationCause
379 : : GetSlotInvalidationCause(const char *cause_name);
380 : : extern const char *GetSlotInvalidationCauseName(ReplicationSlotInvalidationCause cause);
381 : :
382 : : extern bool SlotExistsInSyncStandbySlots(const char *slot_name);
383 : : extern bool StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel);
384 : : extern void WaitForStandbyConfirmation(XLogRecPtr wait_for_lsn);
385 : :
386 : : #endif /* SLOT_H */
|