LCOV - differential code coverage report
Current view: top level - src/backend/replication/logical - logicalctl.c (source / functions) Coverage Total Hit UNC GNC
Current: 0e5ff9b9b45a657aea12440478dc002e9b01f138 vs 0123ce131fca454009439dfa3b2266d1d40737d7 Lines: 97.9 % 144 141 3 141
Current Date: 2026-03-14 14:10:32 -0400 Functions: 100.0 % 17 17 17
Baseline: lcov-20260315-024220-baseline Branches: 70.7 % 92 65 27 65
Baseline Date: 2026-03-14 15:27:56 +0100 Line coverage date bins:
Legend: Lines:     hit not hit
Branches: + taken - not taken # not executed
(30,360] days: 97.9 % 144 141 3 141
Function coverage date bins:
(30,360] days: 100.0 % 17 17 17
Branch coverage date bins:
(30,360] days: 70.7 % 92 65 27 65

 Age         Owner                    Branch data    TLA  Line data    Source code
                                  1                 :                : /*-------------------------------------------------------------------------
                                  2                 :                :  * logicalctl.c
                                  3                 :                :  *      Functionality to control logical decoding status online.
                                  4                 :                :  *
                                  5                 :                :  * This module enables dynamic control of logical decoding availability.
                                  6                 :                :  * Logical decoding becomes active under two conditions: when the wal_level
                                  7                 :                :  * parameter is set to 'logical', or when at least one valid logical replication
                                  8                 :                :  * slot exists with wal_level set to 'replica'. The system disables logical
                                  9                 :                :  * decoding when neither condition is met. Therefore, the dynamic control
                                 10                 :                :  * of logical decoding availability is required only when wal_level is set
                                 11                 :                :  * to 'replica'. Logical decoding is always enabled when wal_level='logical'
                                 12                 :                :  * and always disabled when wal_level='minimal'.
                                 13                 :                :  *
                                 14                 :                :  * The core concept of dynamically enabling and disabling logical decoding
                                 15                 :                :  * is to separately control two aspects: writing information required for
                                 16                 :                :  * logical decoding to WAL records, and using logical decoding itself. During
                                 17                 :                :  * activation, we first enable logical WAL writing while keeping logical
                                 18                 :                :  * decoding disabled. This change is reflected in the read-only
                                 19                 :                :  * effective_wal_level GUC parameter. Once we ensure that all processes have
                                 20                 :                :  * updated to the latest effective_wal_level value, we then enable logical
                                 21                 :                :  * decoding. Deactivation follows a similar careful, multi-step process
                                 22                 :                :  * in reverse order.
                                 23                 :                :  *
                                 24                 :                :  * While activation occurs synchronously right after creating the first
                                 25                 :                :  * logical slot, deactivation happens asynchronously through the checkpointer
                                 26                 :                :  * process. This design avoids a race condition at the end of recovery; see
                                 27                 :                :  * the comments in UpdateLogicalDecodingStatusEndOfRecovery() for details.
                                 28                 :                :  * Asynchronous deactivation also avoids excessive toggling of the logical
                                 29                 :                :  * decoding status in workloads that repeatedly create and drop a single
                                 30                 :                :  * logical slot. On the other hand, this lazy approach can delay changes
                                 31                 :                :  * to effective_wal_level and the disabling logical decoding, especially
                                 32                 :                :  * when the checkpointer is busy with other tasks. We chose this lazy approach
                                 33                 :                :  * in all deactivation paths to keep the implementation simple, even though
                                 34                 :                :  * laziness is strictly required only for end-of-recovery cases. Future work
                                 35                 :                :  * might address this limitation either by using a dedicated worker instead
                                 36                 :                :  * of the checkpointer, or by implementing synchronous waiting during slot
                                 37                 :                :  * drops if workloads are significantly affected by the lazy deactivation
                                 38                 :                :  * of logical decoding.
                                 39                 :                :  *
                                 40                 :                :  * Standby servers use the primary server's effective_wal_level and logical
                                 41                 :                :  * decoding status. Unlike normal activation and deactivation, these
                                 42                 :                :  * are updated simultaneously without status change coordination, solely by
                                 43                 :                :  * replaying XLOG_LOGICAL_DECODING_STATUS_CHANGE records. The local wal_level
                                 44                 :                :  * setting has no effect during this time. Upon promotion, we update the
                                 45                 :                :  * logical decoding status based on local conditions: the wal_level value and
                                 46                 :                :  * the presence of logical slots.
                                 47                 :                :  *
                                 48                 :                :  * In the future, we could extend support to include automatic transitions
                                 49                 :                :  * of effective_wal_level between 'minimal' and 'logical' WAL levels. However,
                                 50                 :                :  * this enhancement would require additional coordination mechanisms and
                                 51                 :                :  * careful implementation of operations such as terminating walsenders and
                                 52                 :                :  * archiver processes while carefully considering the sequence of operations
                                 53                 :                :  * to ensure system stability during these transitions.
                                 54                 :                :  *
                                 55                 :                :  * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
                                 56                 :                :  * Portions Copyright (c) 1994, Regents of the University of California
                                 57                 :                :  *
                                 58                 :                :  * IDENTIFICATION
                                 59                 :                :  *    src/backend/replication/logical/logicalctl.c
                                 60                 :                :  *
                                 61                 :                :  *-------------------------------------------------------------------------
                                 62                 :                :  */
                                 63                 :                : 
                                 64                 :                : #include "postgres.h"
                                 65                 :                : 
                                 66                 :                : #include "access/xloginsert.h"
                                 67                 :                : #include "catalog/pg_control.h"
                                 68                 :                : #include "miscadmin.h"
                                 69                 :                : #include "replication/slot.h"
                                 70                 :                : #include "storage/ipc.h"
                                 71                 :                : #include "storage/lmgr.h"
                                 72                 :                : #include "storage/proc.h"
                                 73                 :                : #include "storage/procarray.h"
                                 74                 :                : #include "storage/procsignal.h"
                                 75                 :                : #include "utils/injection_point.h"
                                 76                 :                : 
                                 77                 :                : /*
                                 78                 :                :  * Struct for controlling the logical decoding status.
                                 79                 :                :  *
                                 80                 :                :  * This struct is protected by LogicalDecodingControlLock.
                                 81                 :                :  */
                                 82                 :                : typedef struct LogicalDecodingCtlData
                                 83                 :                : {
                                 84                 :                :     /*
                                 85                 :                :      * This is the authoritative value used by all processes to determine
                                 86                 :                :      * whether to write additional information required by logical decoding to
                                 87                 :                :      * WAL. Since this information could be checked frequently, each process
                                 88                 :                :      * caches this value in XLogLogicalInfo for better performance.
                                 89                 :                :      */
                                 90                 :                :     bool        xlog_logical_info;
                                 91                 :                : 
                                 92                 :                :     /* True if logical decoding is available in the system */
                                 93                 :                :     bool        logical_decoding_enabled;
                                 94                 :                : 
                                 95                 :                :     /* True if logical decoding might need to be disabled */
                                 96                 :                :     bool        pending_disable;
                                 97                 :                : } LogicalDecodingCtlData;
                                 98                 :                : 
                                 99                 :                : static LogicalDecodingCtlData *LogicalDecodingCtl = NULL;
                                100                 :                : 
                                101                 :                : /*
                                102                 :                :  * A process-local cache of LogicalDecodingCtl->xlog_logical_info. This is
                                103                 :                :  * initialized at process startup, and updated when processing the process
                                104                 :                :  * barrier signal in ProcessBarrierUpdateXLogLogicalInfo(). If the process
                                105                 :                :  * is in an XID-assigned transaction, the cache update is delayed until the
                                106                 :                :  * transaction ends. See the comments for XLogLogicalInfoUpdatePending for details.
                                107                 :                :  */
                                108                 :                : bool        XLogLogicalInfo = false;
                                109                 :                : 
                                110                 :                : /*
                                111                 :                :  * When receiving the PROCSIGNAL_BARRIER_UPDATE_XLOG_LOGICAL_INFO signal, if
                                112                 :                :  * an XID is assigned to the current transaction, the process sets this flag and
                                113                 :                :  * delays the XLogLogicalInfo update until the transaction ends. This ensures
                                114                 :                :  * that the XLogLogicalInfo value (typically accessed via XLogLogicalInfoActive)
                                115                 :                :  * remains consistent throughout the transaction.
                                116                 :                :  */
                                117                 :                : static bool XLogLogicalInfoUpdatePending = false;
                                118                 :                : 
                                119                 :                : static void update_xlog_logical_info(void);
                                120                 :                : static void abort_logical_decoding_activation(int code, Datum arg);
                                121                 :                : static void write_logical_decoding_status_update_record(bool status);
                                122                 :                : 
                                123                 :                : Size
   82 msawada@postgresql.o      124                 :GNC        4447 : LogicalDecodingCtlShmemSize(void)
                                125                 :                : {
                                126                 :           4447 :     return sizeof(LogicalDecodingCtlData);
                                127                 :                : }
                                128                 :                : 
                                129                 :                : void
                                130                 :           1150 : LogicalDecodingCtlShmemInit(void)
                                131                 :                : {
                                132                 :                :     bool        found;
                                133                 :                : 
                                134                 :           1150 :     LogicalDecodingCtl = ShmemInitStruct("Logical decoding control",
                                135                 :                :                                          LogicalDecodingCtlShmemSize(),
                                136                 :                :                                          &found);
                                137                 :                : 
                                138         [ +  - ]:           1150 :     if (!found)
                                139   [ +  -  -  +  :           1150 :         MemSet(LogicalDecodingCtl, 0, LogicalDecodingCtlShmemSize());
                                     -  -  -  -  -  
                                                 - ]
                                140                 :           1150 : }
                                141                 :                : 
                                142                 :                : /*
                                143                 :                :  * Initialize the logical decoding status in shmem at server startup. This
                                144                 :                :  * must be called ONCE during postmaster or standalone-backend startup.
                                145                 :                :  */
                                146                 :                : void
                                147                 :           1000 : StartupLogicalDecodingStatus(bool last_status)
                                148                 :                : {
                                149                 :                :     /* Logical decoding is always disabled when 'minimal' WAL level */
                                150         [ +  + ]:           1000 :     if (wal_level == WAL_LEVEL_MINIMAL)
                                151                 :            351 :         return;
                                152                 :                : 
                                153                 :                :     /*
                                154                 :                :      * Set the initial logical decoding status based on the last status. If
                                155                 :                :      * logical decoding was enabled before the last shutdown, it remains
                                156                 :                :      * enabled as we might have set wal_level='logical' or have at least one
                                157                 :                :      * logical slot.
                                158                 :                :      */
                                159                 :            649 :     LogicalDecodingCtl->xlog_logical_info = last_status;
                                160                 :            649 :     LogicalDecodingCtl->logical_decoding_enabled = last_status;
                                161                 :                : }
                                162                 :                : 
                                163                 :                : /*
                                164                 :                :  * Update the XLogLogicalInfo cache.
                                165                 :                :  */
                                166                 :                : static inline void
                                167                 :          23630 : update_xlog_logical_info(void)
                                168                 :                : {
                                169                 :          23630 :     XLogLogicalInfo = IsXLogLogicalInfoEnabled();
                                170                 :          23630 : }
                                171                 :                : 
                                172                 :                : /*
                                173                 :                :  * Initialize XLogLogicalInfo backend-private cache. This routine is called
                                174                 :                :  * during process initialization.
                                175                 :                :  */
                                176                 :                : void
                                177                 :          21544 : InitializeProcessXLogLogicalInfo(void)
                                178                 :                : {
                                179                 :          21544 :     update_xlog_logical_info();
                                180                 :          21544 : }
                                181                 :                : 
                                182                 :                : /*
                                183                 :                :  * This routine is called when we are told to update XLogLogicalInfo
                                184                 :                :  * by a ProcSignalBarrier.
                                185                 :                :  */
                                186                 :                : bool
                                187                 :           2086 : ProcessBarrierUpdateXLogLogicalInfo(void)
                                188                 :                : {
                                189         [ -  + ]:           2086 :     if (GetTopTransactionIdIfAny() != InvalidTransactionId)
                                190                 :                :     {
                                191                 :                :         /* Delay updating XLogLogicalInfo until the transaction end */
   82 msawada@postgresql.o      192                 :UNC           0 :         XLogLogicalInfoUpdatePending = true;
                                193                 :                :     }
                                194                 :                :     else
   82 msawada@postgresql.o      195                 :GNC        2086 :         update_xlog_logical_info();
                                196                 :                : 
                                197                 :           2086 :     return true;
                                198                 :                : }
                                199                 :                : 
                                200                 :                : /*
                                201                 :                :  * Check the shared memory state and return true if logical decoding is
                                202                 :                :  * enabled on the system.
                                203                 :                :  */
                                204                 :                : bool
                                205                 :          17394 : IsLogicalDecodingEnabled(void)
                                206                 :                : {
                                207                 :                :     bool        enabled;
                                208                 :                : 
                                209                 :          17394 :     LWLockAcquire(LogicalDecodingControlLock, LW_SHARED);
                                210                 :          17394 :     enabled = LogicalDecodingCtl->logical_decoding_enabled;
                                211                 :          17394 :     LWLockRelease(LogicalDecodingControlLock);
                                212                 :                : 
                                213                 :          17394 :     return enabled;
                                214                 :                : }
                                215                 :                : 
                                216                 :                : /*
                                217                 :                :  * Returns true if logical WAL logging is enabled based on the shared memory
                                218                 :                :  * status.
                                219                 :                :  */
                                220                 :                : bool
                                221                 :          24003 : IsXLogLogicalInfoEnabled(void)
                                222                 :                : {
                                223                 :                :     bool        xlog_logical_info;
                                224                 :                : 
                                225                 :          24003 :     LWLockAcquire(LogicalDecodingControlLock, LW_SHARED);
                                226                 :          24003 :     xlog_logical_info = LogicalDecodingCtl->xlog_logical_info;
                                227                 :          24003 :     LWLockRelease(LogicalDecodingControlLock);
                                228                 :                : 
                                229                 :          24003 :     return xlog_logical_info;
                                230                 :                : }
                                231                 :                : 
                                232                 :                : /*
                                233                 :                :  * Reset the local cache at end of the transaction.
                                234                 :                :  */
                                235                 :                : void
                                236                 :         337747 : AtEOXact_LogicalCtl(void)
                                237                 :                : {
                                238                 :                :     /* Update the local cache if there is a pending update */
                                239         [ -  + ]:         337747 :     if (XLogLogicalInfoUpdatePending)
                                240                 :                :     {
   82 msawada@postgresql.o      241                 :UNC           0 :         update_xlog_logical_info();
                                242                 :              0 :         XLogLogicalInfoUpdatePending = false;
                                243                 :                :     }
   82 msawada@postgresql.o      244                 :GNC      337747 : }
                                245                 :                : 
                                246                 :                : /*
                                247                 :                :  * Writes an XLOG_LOGICAL_DECODING_STATUS_CHANGE WAL record with the given
                                248                 :                :  * status.
                                249                 :                :  */
                                250                 :                : static void
                                251                 :             83 : write_logical_decoding_status_update_record(bool status)
                                252                 :                : {
                                253                 :                :     XLogRecPtr  recptr;
                                254                 :                : 
                                255                 :             83 :     XLogBeginInsert();
                                256                 :             83 :     XLogRegisterData(&status, sizeof(bool));
                                257                 :             83 :     recptr = XLogInsert(RM_XLOG_ID, XLOG_LOGICAL_DECODING_STATUS_CHANGE);
                                258                 :             83 :     XLogFlush(recptr);
                                259                 :             83 : }
                                260                 :                : 
                                261                 :                : /*
                                262                 :                :  * A PG_ENSURE_ERROR_CLEANUP callback for activating logical decoding, resetting
                                263                 :                :  * the shared flags to revert the logical decoding activation process.
                                264                 :                :  */
                                265                 :                : static void
                                266                 :              1 : abort_logical_decoding_activation(int code, Datum arg)
                                267                 :                : {
                                268         [ -  + ]:              1 :     Assert(MyReplicationSlot);
                                269         [ -  + ]:              1 :     Assert(!LogicalDecodingCtl->logical_decoding_enabled);
                                270                 :                : 
                                271         [ +  - ]:              1 :     elog(DEBUG1, "aborting logical decoding activation process");
                                272                 :                : 
                                273                 :                :     /*
                                274                 :                :      * Abort the change to xlog_logical_info. We don't need to check
                                275                 :                :      * CheckLogicalSlotExists() as we're still holding a logical slot.
                                276                 :                :      */
                                277                 :              1 :     LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE);
                                278                 :              1 :     LogicalDecodingCtl->xlog_logical_info = false;
                                279                 :              1 :     LWLockRelease(LogicalDecodingControlLock);
                                280                 :                : 
                                281                 :                :     /*
                                282                 :                :      * Some processes might have already started logical info WAL logging, so
                                283                 :                :      * tell all running processes to update their caches. We don't need to
                                284                 :                :      * wait for all processes to disable xlog_logical_info locally as it's
                                285                 :                :      * always safe to write logical information to WAL records, even when not
                                286                 :                :      * strictly required.
                                287                 :                :      */
                                288                 :              1 :     EmitProcSignalBarrier(PROCSIGNAL_BARRIER_UPDATE_XLOG_LOGICAL_INFO);
                                289                 :              1 : }
                                290                 :                : 
                                291                 :                : /*
                                292                 :                :  * Enable logical decoding if disabled.
                                293                 :                :  *
                                294                 :                :  * If this function is called during recovery, it simply returns without
                                295                 :                :  * action since the logical decoding status change is not allowed during
                                296                 :                :  * this time. The logical decoding status depends on the status on the primary.
                                297                 :                :  * The caller should use CheckLogicalDecodingRequirements() before calling this
                                298                 :                :  * function to make sure that the logical decoding status can be modified.
                                299                 :                :  *
                                300                 :                :  * Note that there is no interlock between logical decoding activation
                                301                 :                :  * and slot creation. To ensure enabling logical decoding, the caller
                                302                 :                :  * needs to call this function after creating a logical slot before
                                303                 :                :  * initializing the logical decoding context.
                                304                 :                :  */
                                305                 :                : void
                                306                 :            493 : EnsureLogicalDecodingEnabled(void)
                                307                 :                : {
                                308         [ -  + ]:            493 :     Assert(MyReplicationSlot);
                                309         [ -  + ]:            493 :     Assert(wal_level >= WAL_LEVEL_REPLICA);
                                310                 :                : 
                                311                 :                :     /* Logical decoding is always enabled */
                                312         [ +  + ]:            493 :     if (wal_level >= WAL_LEVEL_LOGICAL)
                                313                 :            483 :         return;
                                314                 :                : 
                                315         [ +  + ]:             10 :     if (RecoveryInProgress())
                                316                 :                :     {
                                317                 :                :         /*
                                318                 :                :          * CheckLogicalDecodingRequirements() must have already errored out if
                                319                 :                :          * logical decoding is not enabled since we cannot enable the logical
                                320                 :                :          * decoding status during recovery.
                                321                 :                :          */
                                322         [ -  + ]:              3 :         Assert(IsLogicalDecodingEnabled());
                                323                 :              3 :         return;
                                324                 :                :     }
                                325                 :                : 
                                326                 :                :     /*
                                327                 :                :      * Ensure to abort the activation process in cases where there in an
                                328                 :                :      * interruption during the wait.
                                329                 :                :      */
                                330         [ +  + ]:              7 :     PG_ENSURE_ERROR_CLEANUP(abort_logical_decoding_activation, (Datum) 0);
                                331                 :                :     {
                                332                 :              7 :         EnableLogicalDecoding();
                                333                 :                :     }
                                334         [ -  + ]:              7 :     PG_END_ENSURE_ERROR_CLEANUP(abort_logical_decoding_activation, (Datum) 0);
                                335                 :                : }
                                336                 :                : 
                                337                 :                : /*
                                338                 :                :  * A workhorse function to enable logical decoding.
                                339                 :                :  */
                                340                 :                : void
                                341                 :             16 : EnableLogicalDecoding(void)
                                342                 :                : {
                                343                 :                :     bool        in_recovery;
                                344                 :                : 
                                345                 :             16 :     LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE);
                                346                 :                : 
                                347                 :                :     /* Return if it is already enabled */
                                348         [ +  + ]:             16 :     if (LogicalDecodingCtl->logical_decoding_enabled)
                                349                 :                :     {
                                350                 :              2 :         LogicalDecodingCtl->pending_disable = false;
                                351                 :              2 :         LWLockRelease(LogicalDecodingControlLock);
                                352                 :              2 :         return;
                                353                 :                :     }
                                354                 :                : 
                                355                 :                :     /*
                                356                 :                :      * Set logical info WAL logging in shmem. All process starts after this
                                357                 :                :      * point will include the information required by logical decoding to WAL
                                358                 :                :      * records.
                                359                 :                :      */
                                360                 :             14 :     LogicalDecodingCtl->xlog_logical_info = true;
                                361                 :                : 
                                362                 :             14 :     LWLockRelease(LogicalDecodingControlLock);
                                363                 :                : 
                                364                 :                :     /*
                                365                 :                :      * Tell all running processes to reflect the xlog_logical_info update, and
                                366                 :                :      * wait. This ensures that all running processes have enabled logical
                                367                 :                :      * information WAL logging.
                                368                 :                :      */
                                369                 :             14 :     WaitForProcSignalBarrier(
                                370                 :                :                              EmitProcSignalBarrier(PROCSIGNAL_BARRIER_UPDATE_XLOG_LOGICAL_INFO));
                                371                 :                : 
                                372                 :             14 :     INJECTION_POINT("logical-decoding-activation", NULL);
                                373                 :                : 
                                374                 :             13 :     in_recovery = RecoveryInProgress();
                                375                 :                : 
                                376                 :                :     /*
                                377                 :                :      * There could be some transactions that might have started with the old
                                378                 :                :      * status, but we don't need to wait for these transactions to complete as
                                379                 :                :      * long as they have valid XIDs. These transactions will appear in the
                                380                 :                :      * xl_running_xacts record and therefore the snapshot builder will not try
                                381                 :                :      * to decode the transaction during the logical decoding initialization.
                                382                 :                :      *
                                383                 :                :      * There is a theoretical case where a transaction decides whether to
                                384                 :                :      * include logical-info to WAL records before getting an XID. In this
                                385                 :                :      * case, the transaction won't appear in xl_running_xacts.
                                386                 :                :      *
                                387                 :                :      * For operations that do not require an XID assignment, the process
                                388                 :                :      * starts including logical-info immediately upon receiving the signal
                                389                 :                :      * (barrier). If such an operation checks the effective_wal_level multiple
                                390                 :                :      * times within a single execution, the resulting WAL records might be
                                391                 :                :      * inconsistent (i.e., logical-info is included in some records but not in
                                392                 :                :      * others). However, this is harmless because logical decoding generally
                                393                 :                :      * ignores WAL records that are not associated with an assigned XID.
                                394                 :                :      *
                                395                 :                :      * One might think we need to wait for all running transactions, including
                                396                 :                :      * those without XIDs and read-only transactions, to finish before
                                397                 :                :      * enabling logical decoding. However, such a requirement would force the
                                398                 :                :      * slot creation to wait for a potentially very long time due to
                                399                 :                :      * long-running read queries, which is practically unacceptable.
                                400                 :                :      */
                                401                 :                : 
                                402                 :             13 :     START_CRIT_SECTION();
                                403                 :                : 
                                404                 :                :     /*
                                405                 :                :      * We enable logical decoding first, followed by writing the WAL record.
                                406                 :                :      * This sequence ensures logical decoding becomes available on the primary
                                407                 :                :      * first.
                                408                 :                :      */
                                409                 :             13 :     LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE);
                                410                 :                : 
                                411                 :             13 :     LogicalDecodingCtl->logical_decoding_enabled = true;
                                412                 :                : 
                                413         [ +  + ]:             13 :     if (!in_recovery)
                                414                 :              4 :         write_logical_decoding_status_update_record(true);
                                415                 :                : 
                                416                 :             13 :     LogicalDecodingCtl->pending_disable = false;
                                417                 :                : 
                                418                 :             13 :     LWLockRelease(LogicalDecodingControlLock);
                                419                 :                : 
                                420         [ -  + ]:             13 :     END_CRIT_SECTION();
                                421                 :                : 
                                422         [ +  + ]:             13 :     if (!in_recovery)
                                423         [ +  - ]:              4 :         ereport(LOG,
                                424                 :                :                 errmsg("logical decoding is enabled upon creating a new logical replication slot"));
                                425                 :                : }
                                426                 :                : 
                                427                 :                : /*
                                428                 :                :  * Initiate a request for disabling logical decoding.
                                429                 :                :  *
                                430                 :                :  * Note that this function does not verify whether logical slots exist. The
                                431                 :                :  * checkpointer will verify if logical decoding should actually be disabled.
                                432                 :                :  */
                                433                 :                : void
                                434                 :            425 : RequestDisableLogicalDecoding(void)
                                435                 :                : {
                                436         [ +  + ]:            425 :     if (wal_level != WAL_LEVEL_REPLICA)
                                437                 :            414 :         return;
                                438                 :                : 
                                439                 :                :     /*
                                440                 :                :      * It's possible that we might not actually need to disable logical
                                441                 :                :      * decoding if someone creates a new logical slot concurrently. We set the
                                442                 :                :      * flag anyway and the checkpointer will check it and disable logical
                                443                 :                :      * decoding if necessary.
                                444                 :                :      */
                                445                 :             11 :     LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE);
                                446                 :             11 :     LogicalDecodingCtl->pending_disable = true;
                                447                 :             11 :     LWLockRelease(LogicalDecodingControlLock);
                                448                 :                : 
                                449                 :             11 :     WakeupCheckpointer();
                                450                 :                : 
                                451         [ +  - ]:             11 :     elog(DEBUG1, "requested disabling logical decoding");
                                452                 :                : }
                                453                 :                : 
                                454                 :                : /*
                                455                 :                :  * Disable logical decoding if necessary.
                                456                 :                :  *
                                457                 :                :  * This function disables logical decoding upon a request initiated by
                                458                 :                :  * RequestDisableLogicalDecoding(). Otherwise, it performs no action.
                                459                 :                :  */
                                460                 :                : void
                                461                 :           4665 : DisableLogicalDecodingIfNecessary(void)
                                462                 :                : {
                                463                 :                :     bool        pending_disable;
                                464                 :                : 
                                465         [ +  + ]:           4665 :     if (wal_level != WAL_LEVEL_REPLICA)
                                466                 :           1206 :         return;
                                467                 :                : 
                                468                 :                :     /*
                                469                 :                :      * Sanity check as we cannot disable logical decoding while holding a
                                470                 :                :      * logical slot.
                                471                 :                :      */
                                472         [ -  + ]:           3459 :     Assert(!MyReplicationSlot);
                                473                 :                : 
                                474         [ +  + ]:           3459 :     if (RecoveryInProgress())
                                475                 :           1738 :         return;
                                476                 :                : 
                                477                 :           1721 :     LWLockAcquire(LogicalDecodingControlLock, LW_SHARED);
                                478                 :           1721 :     pending_disable = LogicalDecodingCtl->pending_disable;
                                479                 :           1721 :     LWLockRelease(LogicalDecodingControlLock);
                                480                 :                : 
                                481                 :                :     /* Quick return if no pending disable request */
                                482         [ +  + ]:           1721 :     if (!pending_disable)
                                483                 :           1711 :         return;
                                484                 :                : 
                                485                 :             10 :     DisableLogicalDecoding();
                                486                 :                : }
                                487                 :                : 
                                488                 :                : /*
                                489                 :                :  * A workhorse function to disable logical decoding.
                                490                 :                :  */
                                491                 :                : void
                                492                 :             19 : DisableLogicalDecoding(void)
                                493                 :                : {
                                494                 :             19 :     bool        in_recovery = RecoveryInProgress();
                                495                 :                : 
                                496                 :             19 :     LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE);
                                497                 :                : 
                                498                 :                :     /*
                                499                 :                :      * Check if we can disable logical decoding.
                                500                 :                :      *
                                501                 :                :      * Skip CheckLogicalSlotExists() check during recovery because the
                                502                 :                :      * existing slots will be invalidated after disabling logical decoding.
                                503                 :                :      */
                                504         [ +  + ]:             19 :     if (!LogicalDecodingCtl->logical_decoding_enabled ||
                                505   [ +  +  +  + ]:             16 :         (!in_recovery && CheckLogicalSlotExists()))
                                506                 :                :     {
                                507                 :              4 :         LogicalDecodingCtl->pending_disable = false;
                                508                 :              4 :         LWLockRelease(LogicalDecodingControlLock);
                                509                 :              4 :         return;
                                510                 :                :     }
                                511                 :                : 
                                512                 :             15 :     START_CRIT_SECTION();
                                513                 :                : 
                                514                 :                :     /*
                                515                 :                :      * We need to disable logical decoding first and then disable logical
                                516                 :                :      * information WAL logging in order to ensure that no logical decoding
                                517                 :                :      * processes WAL records with insufficient information.
                                518                 :                :      */
                                519                 :             15 :     LogicalDecodingCtl->logical_decoding_enabled = false;
                                520                 :                : 
                                521                 :                :     /* Write the WAL to disable logical decoding on standbys too */
                                522         [ +  + ]:             15 :     if (!in_recovery)
                                523                 :              6 :         write_logical_decoding_status_update_record(false);
                                524                 :                : 
                                525                 :                :     /* Now disable logical information WAL logging */
                                526                 :             15 :     LogicalDecodingCtl->xlog_logical_info = false;
                                527                 :             15 :     LogicalDecodingCtl->pending_disable = false;
                                528                 :                : 
                                529         [ -  + ]:             15 :     END_CRIT_SECTION();
                                530                 :                : 
                                531         [ +  + ]:             15 :     if (!in_recovery)
                                532         [ +  - ]:              6 :         ereport(LOG,
                                533                 :                :                 errmsg("logical decoding is disabled because there are no valid logical replication slots"));
                                534                 :                : 
                                535                 :             15 :     LWLockRelease(LogicalDecodingControlLock);
                                536                 :                : 
                                537                 :                :     /*
                                538                 :                :      * Tell all running processes to reflect the xlog_logical_info update.
                                539                 :                :      * Unlike when enabling logical decoding, we don't need to wait for all
                                540                 :                :      * processes to complete it in this case. We already disabled logical
                                541                 :                :      * decoding and it's always safe to write logical information to WAL
                                542                 :                :      * records, even when not strictly required. Therefore, we don't need to
                                543                 :                :      * wait for all running transactions to finish either.
                                544                 :                :      */
                                545                 :             15 :     EmitProcSignalBarrier(PROCSIGNAL_BARRIER_UPDATE_XLOG_LOGICAL_INFO);
                                546                 :                : }
                                547                 :                : 
                                548                 :                : /*
                                549                 :                :  * Updates the logical decoding status at end of recovery, and ensures that
                                550                 :                :  * all running processes have the updated XLogLogicalInfo status. This
                                551                 :                :  * function must be called before accepting writes.
                                552                 :                :  */
                                553                 :                : void
                                554                 :            939 : UpdateLogicalDecodingStatusEndOfRecovery(void)
                                555                 :                : {
                                556                 :            939 :     bool        new_status = false;
                                557                 :                : 
                                558         [ -  + ]:            939 :     Assert(RecoveryInProgress());
                                559                 :                : 
                                560                 :                :     /*
                                561                 :                :      * With 'minimal' WAL level, there are no logical replication slots during
                                562                 :                :      * recovery. Logical decoding is always disabled, so there is no need to
                                563                 :                :      * synchronize XLogLogicalInfo.
                                564                 :                :      */
                                565         [ +  + ]:            939 :     if (wal_level == WAL_LEVEL_MINIMAL)
                                566                 :                :     {
                                567   [ +  -  -  + ]:            351 :         Assert(!IsXLogLogicalInfoEnabled() && !IsLogicalDecodingEnabled());
                                568                 :            351 :         return;
                                569                 :                :     }
                                570                 :                : 
                                571                 :            588 :     LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE);
                                572                 :                : 
                                573   [ +  +  +  + ]:            588 :     if (wal_level == WAL_LEVEL_LOGICAL || CheckLogicalSlotExists())
                                574                 :            131 :         new_status = true;
                                575                 :                : 
                                576                 :                :     /*
                                577                 :                :      * When recovery ends, we need to either enable or disable logical
                                578                 :                :      * decoding based on the wal_level setting and the presence of logical
                                579                 :                :      * slots. We need to note that concurrent slot creation and deletion could
                                580                 :                :      * happen but WAL writes are still not permitted until recovery fully
                                581                 :                :      * completes. Here's how we handle concurrent toggling of logical
                                582                 :                :      * decoding:
                                583                 :                :      *
                                584                 :                :      * For 'enable' case, if there's a concurrent disable request before
                                585                 :                :      * recovery fully completes, the checkpointer will handle it after
                                586                 :                :      * recovery is done. This means there might be a brief period after
                                587                 :                :      * recovery where logical decoding remains enabled even with no logical
                                588                 :                :      * replication slots present. This temporary state is not new - it can
                                589                 :                :      * already occur due to the checkpointer's asynchronous deactivation
                                590                 :                :      * process.
                                591                 :                :      *
                                592                 :                :      * For 'disable' case, backend cannot create logical replication slots
                                593                 :                :      * during recovery (see checks in CheckLogicalDecodingRequirements()),
                                594                 :                :      * which prevents a race condition between disabling logical decoding and
                                595                 :                :      * concurrent slot creation.
                                596                 :                :      */
                                597         [ +  + ]:            588 :     if (new_status != LogicalDecodingCtl->logical_decoding_enabled)
                                598                 :                :     {
                                599                 :                :         /*
                                600                 :                :          * Update both the logical decoding status and logical WAL logging
                                601                 :                :          * status. Unlike toggling these status during non-recovery, we don't
                                602                 :                :          * need to worry about the operation order as WAL writes are still not
                                603                 :                :          * permitted.
                                604                 :                :          */
                                605                 :             73 :         LogicalDecodingCtl->xlog_logical_info = new_status;
                                606                 :             73 :         LogicalDecodingCtl->logical_decoding_enabled = new_status;
                                607                 :                : 
                                608         [ +  + ]:             73 :         elog(DEBUG1,
                                609                 :                :              "update logical decoding status to %d at the end of recovery",
                                610                 :                :              new_status);
                                611                 :                : 
                                612                 :                :         /*
                                613                 :                :          * Now that we updated the logical decoding status, clear the pending
                                614                 :                :          * disable flag. It's possible that a concurrent process drops the
                                615                 :                :          * last logical slot and initiates the pending disable again. The
                                616                 :                :          * checkpointer process will check it.
                                617                 :                :          */
                                618                 :             73 :         LogicalDecodingCtl->pending_disable = false;
                                619                 :                : 
                                620                 :             73 :         LWLockRelease(LogicalDecodingControlLock);
                                621                 :                : 
                                622                 :             73 :         write_logical_decoding_status_update_record(new_status);
                                623                 :                :     }
                                624                 :                :     else
                                625                 :            515 :         LWLockRelease(LogicalDecodingControlLock);
                                626                 :                : 
                                627                 :                :     /*
                                628                 :                :      * Ensure all running processes have the updated status. We don't need to
                                629                 :                :      * wait for running transactions to finish as we don't accept any writes
                                630                 :                :      * yet. On the other hand, we need to wait for synchronizing
                                631                 :                :      * XLogLogicalInfo even if we've not updated the status above as the
                                632                 :                :      * status have been turned on and off during recovery, having running
                                633                 :                :      * processes have different status on their local caches.
                                634                 :                :      */
                                635         [ +  + ]:            588 :     if (IsUnderPostmaster)
                                636                 :            467 :         WaitForProcSignalBarrier(
                                637                 :                :                                  EmitProcSignalBarrier(PROCSIGNAL_BARRIER_UPDATE_XLOG_LOGICAL_INFO));
                                638                 :                : 
                                639                 :            588 :     INJECTION_POINT("startup-logical-decoding-status-change-end-of-recovery", NULL);
                                640                 :                : }
        

Generated by: LCOV version 2.4-beta