Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : * logicalctl.c
3 : : * Functionality to control logical decoding status online.
4 : : *
5 : : * This module enables dynamic control of logical decoding availability.
6 : : * Logical decoding becomes active under two conditions: when the wal_level
7 : : * parameter is set to 'logical', or when at least one valid logical replication
8 : : * slot exists with wal_level set to 'replica'. The system disables logical
9 : : * decoding when neither condition is met. Therefore, the dynamic control
10 : : * of logical decoding availability is required only when wal_level is set
11 : : * to 'replica'. Logical decoding is always enabled when wal_level='logical'
12 : : * and always disabled when wal_level='minimal'.
13 : : *
14 : : * The core concept of dynamically enabling and disabling logical decoding
15 : : * is to separately control two aspects: writing information required for
16 : : * logical decoding to WAL records, and using logical decoding itself. During
17 : : * activation, we first enable logical WAL writing while keeping logical
18 : : * decoding disabled. This change is reflected in the read-only
19 : : * effective_wal_level GUC parameter. Once we ensure that all processes have
20 : : * updated to the latest effective_wal_level value, we then enable logical
21 : : * decoding. Deactivation follows a similar careful, multi-step process
22 : : * in reverse order.
23 : : *
24 : : * While activation occurs synchronously right after creating the first
25 : : * logical slot, deactivation happens asynchronously through the checkpointer
26 : : * process. This design avoids a race condition at the end of recovery; see
27 : : * the comments in UpdateLogicalDecodingStatusEndOfRecovery() for details.
28 : : * Asynchronous deactivation also avoids excessive toggling of the logical
29 : : * decoding status in workloads that repeatedly create and drop a single
30 : : * logical slot. On the other hand, this lazy approach can delay changes
31 : : * to effective_wal_level and the disabling logical decoding, especially
32 : : * when the checkpointer is busy with other tasks. We chose this lazy approach
33 : : * in all deactivation paths to keep the implementation simple, even though
34 : : * laziness is strictly required only for end-of-recovery cases. Future work
35 : : * might address this limitation either by using a dedicated worker instead
36 : : * of the checkpointer, or by implementing synchronous waiting during slot
37 : : * drops if workloads are significantly affected by the lazy deactivation
38 : : * of logical decoding.
39 : : *
40 : : * Standby servers use the primary server's effective_wal_level and logical
41 : : * decoding status. Unlike normal activation and deactivation, these
42 : : * are updated simultaneously without status change coordination, solely by
43 : : * replaying XLOG_LOGICAL_DECODING_STATUS_CHANGE records. The local wal_level
44 : : * setting has no effect during this time. Upon promotion, we update the
45 : : * logical decoding status based on local conditions: the wal_level value and
46 : : * the presence of logical slots.
47 : : *
48 : : * In the future, we could extend support to include automatic transitions
49 : : * of effective_wal_level between 'minimal' and 'logical' WAL levels. However,
50 : : * this enhancement would require additional coordination mechanisms and
51 : : * careful implementation of operations such as terminating walsenders and
52 : : * archiver processes while carefully considering the sequence of operations
53 : : * to ensure system stability during these transitions.
54 : : *
55 : : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
56 : : * Portions Copyright (c) 1994, Regents of the University of California
57 : : *
58 : : * IDENTIFICATION
59 : : * src/backend/replication/logical/logicalctl.c
60 : : *
61 : : *-------------------------------------------------------------------------
62 : : */
63 : :
64 : : #include "postgres.h"
65 : :
66 : : #include "access/xloginsert.h"
67 : : #include "catalog/pg_control.h"
68 : : #include "miscadmin.h"
69 : : #include "replication/slot.h"
70 : : #include "storage/ipc.h"
71 : : #include "storage/lmgr.h"
72 : : #include "storage/proc.h"
73 : : #include "storage/procarray.h"
74 : : #include "storage/procsignal.h"
75 : : #include "utils/injection_point.h"
76 : :
77 : : /*
78 : : * Struct for controlling the logical decoding status.
79 : : *
80 : : * This struct is protected by LogicalDecodingControlLock.
81 : : */
82 : : typedef struct LogicalDecodingCtlData
83 : : {
84 : : /*
85 : : * This is the authoritative value used by all processes to determine
86 : : * whether to write additional information required by logical decoding to
87 : : * WAL. Since this information could be checked frequently, each process
88 : : * caches this value in XLogLogicalInfo for better performance.
89 : : */
90 : : bool xlog_logical_info;
91 : :
92 : : /* True if logical decoding is available in the system */
93 : : bool logical_decoding_enabled;
94 : :
95 : : /* True if logical decoding might need to be disabled */
96 : : bool pending_disable;
97 : : } LogicalDecodingCtlData;
98 : :
99 : : static LogicalDecodingCtlData *LogicalDecodingCtl = NULL;
100 : :
101 : : /*
102 : : * A process-local cache of LogicalDecodingCtl->xlog_logical_info. This is
103 : : * initialized at process startup, and updated when processing the process
104 : : * barrier signal in ProcessBarrierUpdateXLogLogicalInfo(). If the process
105 : : * is in an XID-assigned transaction, the cache update is delayed until the
106 : : * transaction ends. See the comments for XLogLogicalInfoUpdatePending for details.
107 : : */
108 : : bool XLogLogicalInfo = false;
109 : :
110 : : /*
111 : : * When receiving the PROCSIGNAL_BARRIER_UPDATE_XLOG_LOGICAL_INFO signal, if
112 : : * an XID is assigned to the current transaction, the process sets this flag and
113 : : * delays the XLogLogicalInfo update until the transaction ends. This ensures
114 : : * that the XLogLogicalInfo value (typically accessed via XLogLogicalInfoActive)
115 : : * remains consistent throughout the transaction.
116 : : */
117 : : static bool XLogLogicalInfoUpdatePending = false;
118 : :
119 : : static void update_xlog_logical_info(void);
120 : : static void abort_logical_decoding_activation(int code, Datum arg);
121 : : static void write_logical_decoding_status_update_record(bool status);
122 : :
123 : : Size
82 msawada@postgresql.o 124 :GNC 4447 : LogicalDecodingCtlShmemSize(void)
125 : : {
126 : 4447 : return sizeof(LogicalDecodingCtlData);
127 : : }
128 : :
129 : : void
130 : 1150 : LogicalDecodingCtlShmemInit(void)
131 : : {
132 : : bool found;
133 : :
134 : 1150 : LogicalDecodingCtl = ShmemInitStruct("Logical decoding control",
135 : : LogicalDecodingCtlShmemSize(),
136 : : &found);
137 : :
138 [ + - ]: 1150 : if (!found)
139 [ + - - + : 1150 : MemSet(LogicalDecodingCtl, 0, LogicalDecodingCtlShmemSize());
- - - - -
- ]
140 : 1150 : }
141 : :
142 : : /*
143 : : * Initialize the logical decoding status in shmem at server startup. This
144 : : * must be called ONCE during postmaster or standalone-backend startup.
145 : : */
146 : : void
147 : 1000 : StartupLogicalDecodingStatus(bool last_status)
148 : : {
149 : : /* Logical decoding is always disabled when 'minimal' WAL level */
150 [ + + ]: 1000 : if (wal_level == WAL_LEVEL_MINIMAL)
151 : 351 : return;
152 : :
153 : : /*
154 : : * Set the initial logical decoding status based on the last status. If
155 : : * logical decoding was enabled before the last shutdown, it remains
156 : : * enabled as we might have set wal_level='logical' or have at least one
157 : : * logical slot.
158 : : */
159 : 649 : LogicalDecodingCtl->xlog_logical_info = last_status;
160 : 649 : LogicalDecodingCtl->logical_decoding_enabled = last_status;
161 : : }
162 : :
163 : : /*
164 : : * Update the XLogLogicalInfo cache.
165 : : */
166 : : static inline void
167 : 23630 : update_xlog_logical_info(void)
168 : : {
169 : 23630 : XLogLogicalInfo = IsXLogLogicalInfoEnabled();
170 : 23630 : }
171 : :
172 : : /*
173 : : * Initialize XLogLogicalInfo backend-private cache. This routine is called
174 : : * during process initialization.
175 : : */
176 : : void
177 : 21544 : InitializeProcessXLogLogicalInfo(void)
178 : : {
179 : 21544 : update_xlog_logical_info();
180 : 21544 : }
181 : :
182 : : /*
183 : : * This routine is called when we are told to update XLogLogicalInfo
184 : : * by a ProcSignalBarrier.
185 : : */
186 : : bool
187 : 2086 : ProcessBarrierUpdateXLogLogicalInfo(void)
188 : : {
189 [ - + ]: 2086 : if (GetTopTransactionIdIfAny() != InvalidTransactionId)
190 : : {
191 : : /* Delay updating XLogLogicalInfo until the transaction end */
82 msawada@postgresql.o 192 :UNC 0 : XLogLogicalInfoUpdatePending = true;
193 : : }
194 : : else
82 msawada@postgresql.o 195 :GNC 2086 : update_xlog_logical_info();
196 : :
197 : 2086 : return true;
198 : : }
199 : :
200 : : /*
201 : : * Check the shared memory state and return true if logical decoding is
202 : : * enabled on the system.
203 : : */
204 : : bool
205 : 17394 : IsLogicalDecodingEnabled(void)
206 : : {
207 : : bool enabled;
208 : :
209 : 17394 : LWLockAcquire(LogicalDecodingControlLock, LW_SHARED);
210 : 17394 : enabled = LogicalDecodingCtl->logical_decoding_enabled;
211 : 17394 : LWLockRelease(LogicalDecodingControlLock);
212 : :
213 : 17394 : return enabled;
214 : : }
215 : :
216 : : /*
217 : : * Returns true if logical WAL logging is enabled based on the shared memory
218 : : * status.
219 : : */
220 : : bool
221 : 24003 : IsXLogLogicalInfoEnabled(void)
222 : : {
223 : : bool xlog_logical_info;
224 : :
225 : 24003 : LWLockAcquire(LogicalDecodingControlLock, LW_SHARED);
226 : 24003 : xlog_logical_info = LogicalDecodingCtl->xlog_logical_info;
227 : 24003 : LWLockRelease(LogicalDecodingControlLock);
228 : :
229 : 24003 : return xlog_logical_info;
230 : : }
231 : :
232 : : /*
233 : : * Reset the local cache at end of the transaction.
234 : : */
235 : : void
236 : 337747 : AtEOXact_LogicalCtl(void)
237 : : {
238 : : /* Update the local cache if there is a pending update */
239 [ - + ]: 337747 : if (XLogLogicalInfoUpdatePending)
240 : : {
82 msawada@postgresql.o 241 :UNC 0 : update_xlog_logical_info();
242 : 0 : XLogLogicalInfoUpdatePending = false;
243 : : }
82 msawada@postgresql.o 244 :GNC 337747 : }
245 : :
246 : : /*
247 : : * Writes an XLOG_LOGICAL_DECODING_STATUS_CHANGE WAL record with the given
248 : : * status.
249 : : */
250 : : static void
251 : 83 : write_logical_decoding_status_update_record(bool status)
252 : : {
253 : : XLogRecPtr recptr;
254 : :
255 : 83 : XLogBeginInsert();
256 : 83 : XLogRegisterData(&status, sizeof(bool));
257 : 83 : recptr = XLogInsert(RM_XLOG_ID, XLOG_LOGICAL_DECODING_STATUS_CHANGE);
258 : 83 : XLogFlush(recptr);
259 : 83 : }
260 : :
261 : : /*
262 : : * A PG_ENSURE_ERROR_CLEANUP callback for activating logical decoding, resetting
263 : : * the shared flags to revert the logical decoding activation process.
264 : : */
265 : : static void
266 : 1 : abort_logical_decoding_activation(int code, Datum arg)
267 : : {
268 [ - + ]: 1 : Assert(MyReplicationSlot);
269 [ - + ]: 1 : Assert(!LogicalDecodingCtl->logical_decoding_enabled);
270 : :
271 [ + - ]: 1 : elog(DEBUG1, "aborting logical decoding activation process");
272 : :
273 : : /*
274 : : * Abort the change to xlog_logical_info. We don't need to check
275 : : * CheckLogicalSlotExists() as we're still holding a logical slot.
276 : : */
277 : 1 : LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE);
278 : 1 : LogicalDecodingCtl->xlog_logical_info = false;
279 : 1 : LWLockRelease(LogicalDecodingControlLock);
280 : :
281 : : /*
282 : : * Some processes might have already started logical info WAL logging, so
283 : : * tell all running processes to update their caches. We don't need to
284 : : * wait for all processes to disable xlog_logical_info locally as it's
285 : : * always safe to write logical information to WAL records, even when not
286 : : * strictly required.
287 : : */
288 : 1 : EmitProcSignalBarrier(PROCSIGNAL_BARRIER_UPDATE_XLOG_LOGICAL_INFO);
289 : 1 : }
290 : :
291 : : /*
292 : : * Enable logical decoding if disabled.
293 : : *
294 : : * If this function is called during recovery, it simply returns without
295 : : * action since the logical decoding status change is not allowed during
296 : : * this time. The logical decoding status depends on the status on the primary.
297 : : * The caller should use CheckLogicalDecodingRequirements() before calling this
298 : : * function to make sure that the logical decoding status can be modified.
299 : : *
300 : : * Note that there is no interlock between logical decoding activation
301 : : * and slot creation. To ensure enabling logical decoding, the caller
302 : : * needs to call this function after creating a logical slot before
303 : : * initializing the logical decoding context.
304 : : */
305 : : void
306 : 493 : EnsureLogicalDecodingEnabled(void)
307 : : {
308 [ - + ]: 493 : Assert(MyReplicationSlot);
309 [ - + ]: 493 : Assert(wal_level >= WAL_LEVEL_REPLICA);
310 : :
311 : : /* Logical decoding is always enabled */
312 [ + + ]: 493 : if (wal_level >= WAL_LEVEL_LOGICAL)
313 : 483 : return;
314 : :
315 [ + + ]: 10 : if (RecoveryInProgress())
316 : : {
317 : : /*
318 : : * CheckLogicalDecodingRequirements() must have already errored out if
319 : : * logical decoding is not enabled since we cannot enable the logical
320 : : * decoding status during recovery.
321 : : */
322 [ - + ]: 3 : Assert(IsLogicalDecodingEnabled());
323 : 3 : return;
324 : : }
325 : :
326 : : /*
327 : : * Ensure to abort the activation process in cases where there in an
328 : : * interruption during the wait.
329 : : */
330 [ + + ]: 7 : PG_ENSURE_ERROR_CLEANUP(abort_logical_decoding_activation, (Datum) 0);
331 : : {
332 : 7 : EnableLogicalDecoding();
333 : : }
334 [ - + ]: 7 : PG_END_ENSURE_ERROR_CLEANUP(abort_logical_decoding_activation, (Datum) 0);
335 : : }
336 : :
337 : : /*
338 : : * A workhorse function to enable logical decoding.
339 : : */
340 : : void
341 : 16 : EnableLogicalDecoding(void)
342 : : {
343 : : bool in_recovery;
344 : :
345 : 16 : LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE);
346 : :
347 : : /* Return if it is already enabled */
348 [ + + ]: 16 : if (LogicalDecodingCtl->logical_decoding_enabled)
349 : : {
350 : 2 : LogicalDecodingCtl->pending_disable = false;
351 : 2 : LWLockRelease(LogicalDecodingControlLock);
352 : 2 : return;
353 : : }
354 : :
355 : : /*
356 : : * Set logical info WAL logging in shmem. All process starts after this
357 : : * point will include the information required by logical decoding to WAL
358 : : * records.
359 : : */
360 : 14 : LogicalDecodingCtl->xlog_logical_info = true;
361 : :
362 : 14 : LWLockRelease(LogicalDecodingControlLock);
363 : :
364 : : /*
365 : : * Tell all running processes to reflect the xlog_logical_info update, and
366 : : * wait. This ensures that all running processes have enabled logical
367 : : * information WAL logging.
368 : : */
369 : 14 : WaitForProcSignalBarrier(
370 : : EmitProcSignalBarrier(PROCSIGNAL_BARRIER_UPDATE_XLOG_LOGICAL_INFO));
371 : :
372 : 14 : INJECTION_POINT("logical-decoding-activation", NULL);
373 : :
374 : 13 : in_recovery = RecoveryInProgress();
375 : :
376 : : /*
377 : : * There could be some transactions that might have started with the old
378 : : * status, but we don't need to wait for these transactions to complete as
379 : : * long as they have valid XIDs. These transactions will appear in the
380 : : * xl_running_xacts record and therefore the snapshot builder will not try
381 : : * to decode the transaction during the logical decoding initialization.
382 : : *
383 : : * There is a theoretical case where a transaction decides whether to
384 : : * include logical-info to WAL records before getting an XID. In this
385 : : * case, the transaction won't appear in xl_running_xacts.
386 : : *
387 : : * For operations that do not require an XID assignment, the process
388 : : * starts including logical-info immediately upon receiving the signal
389 : : * (barrier). If such an operation checks the effective_wal_level multiple
390 : : * times within a single execution, the resulting WAL records might be
391 : : * inconsistent (i.e., logical-info is included in some records but not in
392 : : * others). However, this is harmless because logical decoding generally
393 : : * ignores WAL records that are not associated with an assigned XID.
394 : : *
395 : : * One might think we need to wait for all running transactions, including
396 : : * those without XIDs and read-only transactions, to finish before
397 : : * enabling logical decoding. However, such a requirement would force the
398 : : * slot creation to wait for a potentially very long time due to
399 : : * long-running read queries, which is practically unacceptable.
400 : : */
401 : :
402 : 13 : START_CRIT_SECTION();
403 : :
404 : : /*
405 : : * We enable logical decoding first, followed by writing the WAL record.
406 : : * This sequence ensures logical decoding becomes available on the primary
407 : : * first.
408 : : */
409 : 13 : LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE);
410 : :
411 : 13 : LogicalDecodingCtl->logical_decoding_enabled = true;
412 : :
413 [ + + ]: 13 : if (!in_recovery)
414 : 4 : write_logical_decoding_status_update_record(true);
415 : :
416 : 13 : LogicalDecodingCtl->pending_disable = false;
417 : :
418 : 13 : LWLockRelease(LogicalDecodingControlLock);
419 : :
420 [ - + ]: 13 : END_CRIT_SECTION();
421 : :
422 [ + + ]: 13 : if (!in_recovery)
423 [ + - ]: 4 : ereport(LOG,
424 : : errmsg("logical decoding is enabled upon creating a new logical replication slot"));
425 : : }
426 : :
427 : : /*
428 : : * Initiate a request for disabling logical decoding.
429 : : *
430 : : * Note that this function does not verify whether logical slots exist. The
431 : : * checkpointer will verify if logical decoding should actually be disabled.
432 : : */
433 : : void
434 : 425 : RequestDisableLogicalDecoding(void)
435 : : {
436 [ + + ]: 425 : if (wal_level != WAL_LEVEL_REPLICA)
437 : 414 : return;
438 : :
439 : : /*
440 : : * It's possible that we might not actually need to disable logical
441 : : * decoding if someone creates a new logical slot concurrently. We set the
442 : : * flag anyway and the checkpointer will check it and disable logical
443 : : * decoding if necessary.
444 : : */
445 : 11 : LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE);
446 : 11 : LogicalDecodingCtl->pending_disable = true;
447 : 11 : LWLockRelease(LogicalDecodingControlLock);
448 : :
449 : 11 : WakeupCheckpointer();
450 : :
451 [ + - ]: 11 : elog(DEBUG1, "requested disabling logical decoding");
452 : : }
453 : :
454 : : /*
455 : : * Disable logical decoding if necessary.
456 : : *
457 : : * This function disables logical decoding upon a request initiated by
458 : : * RequestDisableLogicalDecoding(). Otherwise, it performs no action.
459 : : */
460 : : void
461 : 4665 : DisableLogicalDecodingIfNecessary(void)
462 : : {
463 : : bool pending_disable;
464 : :
465 [ + + ]: 4665 : if (wal_level != WAL_LEVEL_REPLICA)
466 : 1206 : return;
467 : :
468 : : /*
469 : : * Sanity check as we cannot disable logical decoding while holding a
470 : : * logical slot.
471 : : */
472 [ - + ]: 3459 : Assert(!MyReplicationSlot);
473 : :
474 [ + + ]: 3459 : if (RecoveryInProgress())
475 : 1738 : return;
476 : :
477 : 1721 : LWLockAcquire(LogicalDecodingControlLock, LW_SHARED);
478 : 1721 : pending_disable = LogicalDecodingCtl->pending_disable;
479 : 1721 : LWLockRelease(LogicalDecodingControlLock);
480 : :
481 : : /* Quick return if no pending disable request */
482 [ + + ]: 1721 : if (!pending_disable)
483 : 1711 : return;
484 : :
485 : 10 : DisableLogicalDecoding();
486 : : }
487 : :
488 : : /*
489 : : * A workhorse function to disable logical decoding.
490 : : */
491 : : void
492 : 19 : DisableLogicalDecoding(void)
493 : : {
494 : 19 : bool in_recovery = RecoveryInProgress();
495 : :
496 : 19 : LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE);
497 : :
498 : : /*
499 : : * Check if we can disable logical decoding.
500 : : *
501 : : * Skip CheckLogicalSlotExists() check during recovery because the
502 : : * existing slots will be invalidated after disabling logical decoding.
503 : : */
504 [ + + ]: 19 : if (!LogicalDecodingCtl->logical_decoding_enabled ||
505 [ + + + + ]: 16 : (!in_recovery && CheckLogicalSlotExists()))
506 : : {
507 : 4 : LogicalDecodingCtl->pending_disable = false;
508 : 4 : LWLockRelease(LogicalDecodingControlLock);
509 : 4 : return;
510 : : }
511 : :
512 : 15 : START_CRIT_SECTION();
513 : :
514 : : /*
515 : : * We need to disable logical decoding first and then disable logical
516 : : * information WAL logging in order to ensure that no logical decoding
517 : : * processes WAL records with insufficient information.
518 : : */
519 : 15 : LogicalDecodingCtl->logical_decoding_enabled = false;
520 : :
521 : : /* Write the WAL to disable logical decoding on standbys too */
522 [ + + ]: 15 : if (!in_recovery)
523 : 6 : write_logical_decoding_status_update_record(false);
524 : :
525 : : /* Now disable logical information WAL logging */
526 : 15 : LogicalDecodingCtl->xlog_logical_info = false;
527 : 15 : LogicalDecodingCtl->pending_disable = false;
528 : :
529 [ - + ]: 15 : END_CRIT_SECTION();
530 : :
531 [ + + ]: 15 : if (!in_recovery)
532 [ + - ]: 6 : ereport(LOG,
533 : : errmsg("logical decoding is disabled because there are no valid logical replication slots"));
534 : :
535 : 15 : LWLockRelease(LogicalDecodingControlLock);
536 : :
537 : : /*
538 : : * Tell all running processes to reflect the xlog_logical_info update.
539 : : * Unlike when enabling logical decoding, we don't need to wait for all
540 : : * processes to complete it in this case. We already disabled logical
541 : : * decoding and it's always safe to write logical information to WAL
542 : : * records, even when not strictly required. Therefore, we don't need to
543 : : * wait for all running transactions to finish either.
544 : : */
545 : 15 : EmitProcSignalBarrier(PROCSIGNAL_BARRIER_UPDATE_XLOG_LOGICAL_INFO);
546 : : }
547 : :
548 : : /*
549 : : * Updates the logical decoding status at end of recovery, and ensures that
550 : : * all running processes have the updated XLogLogicalInfo status. This
551 : : * function must be called before accepting writes.
552 : : */
553 : : void
554 : 939 : UpdateLogicalDecodingStatusEndOfRecovery(void)
555 : : {
556 : 939 : bool new_status = false;
557 : :
558 [ - + ]: 939 : Assert(RecoveryInProgress());
559 : :
560 : : /*
561 : : * With 'minimal' WAL level, there are no logical replication slots during
562 : : * recovery. Logical decoding is always disabled, so there is no need to
563 : : * synchronize XLogLogicalInfo.
564 : : */
565 [ + + ]: 939 : if (wal_level == WAL_LEVEL_MINIMAL)
566 : : {
567 [ + - - + ]: 351 : Assert(!IsXLogLogicalInfoEnabled() && !IsLogicalDecodingEnabled());
568 : 351 : return;
569 : : }
570 : :
571 : 588 : LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE);
572 : :
573 [ + + + + ]: 588 : if (wal_level == WAL_LEVEL_LOGICAL || CheckLogicalSlotExists())
574 : 131 : new_status = true;
575 : :
576 : : /*
577 : : * When recovery ends, we need to either enable or disable logical
578 : : * decoding based on the wal_level setting and the presence of logical
579 : : * slots. We need to note that concurrent slot creation and deletion could
580 : : * happen but WAL writes are still not permitted until recovery fully
581 : : * completes. Here's how we handle concurrent toggling of logical
582 : : * decoding:
583 : : *
584 : : * For 'enable' case, if there's a concurrent disable request before
585 : : * recovery fully completes, the checkpointer will handle it after
586 : : * recovery is done. This means there might be a brief period after
587 : : * recovery where logical decoding remains enabled even with no logical
588 : : * replication slots present. This temporary state is not new - it can
589 : : * already occur due to the checkpointer's asynchronous deactivation
590 : : * process.
591 : : *
592 : : * For 'disable' case, backend cannot create logical replication slots
593 : : * during recovery (see checks in CheckLogicalDecodingRequirements()),
594 : : * which prevents a race condition between disabling logical decoding and
595 : : * concurrent slot creation.
596 : : */
597 [ + + ]: 588 : if (new_status != LogicalDecodingCtl->logical_decoding_enabled)
598 : : {
599 : : /*
600 : : * Update both the logical decoding status and logical WAL logging
601 : : * status. Unlike toggling these status during non-recovery, we don't
602 : : * need to worry about the operation order as WAL writes are still not
603 : : * permitted.
604 : : */
605 : 73 : LogicalDecodingCtl->xlog_logical_info = new_status;
606 : 73 : LogicalDecodingCtl->logical_decoding_enabled = new_status;
607 : :
608 [ + + ]: 73 : elog(DEBUG1,
609 : : "update logical decoding status to %d at the end of recovery",
610 : : new_status);
611 : :
612 : : /*
613 : : * Now that we updated the logical decoding status, clear the pending
614 : : * disable flag. It's possible that a concurrent process drops the
615 : : * last logical slot and initiates the pending disable again. The
616 : : * checkpointer process will check it.
617 : : */
618 : 73 : LogicalDecodingCtl->pending_disable = false;
619 : :
620 : 73 : LWLockRelease(LogicalDecodingControlLock);
621 : :
622 : 73 : write_logical_decoding_status_update_record(new_status);
623 : : }
624 : : else
625 : 515 : LWLockRelease(LogicalDecodingControlLock);
626 : :
627 : : /*
628 : : * Ensure all running processes have the updated status. We don't need to
629 : : * wait for running transactions to finish as we don't accept any writes
630 : : * yet. On the other hand, we need to wait for synchronizing
631 : : * XLogLogicalInfo even if we've not updated the status above as the
632 : : * status have been turned on and off during recovery, having running
633 : : * processes have different status on their local caches.
634 : : */
635 [ + + ]: 588 : if (IsUnderPostmaster)
636 : 467 : WaitForProcSignalBarrier(
637 : : EmitProcSignalBarrier(PROCSIGNAL_BARRIER_UPDATE_XLOG_LOGICAL_INFO));
638 : :
639 : 588 : INJECTION_POINT("startup-logical-decoding-status-change-end-of-recovery", NULL);
640 : : }
|