LCOV - differential code coverage report
Current view: top level - src/backend/replication/logical - launcher.c (source / functions) Coverage Total Hit UNC UIC UBC GBC GNC CBC DCB
Current: 806555e3000d0b0e0c536c1dc65548128d457d86 vs 1d325ad99cb2dec0e8b45ba36909ee0a497d2a57 Lines: 89.1 % 589 525 6 58 111 414 16
Current Date: 2025-12-17 08:58:58 +0900 Functions: 100.0 % 37 37 15 22 3
Baseline: lcov-20251217-005640-baseline Branches: 65.4 % 402 263 36 2 101 2 70 191
Baseline Date: 2025-12-16 12:57:12 -0800 Line coverage date bins:
Legend: Lines:     hit not hit
Branches: + taken - not taken # not executed
(1,7] days: 100.0 % 1 1 1
(30,360] days: 94.7 % 131 124 6 1 110 14
(360..) days: 87.5 % 457 400 57 400
Function coverage date bins:
(30,360] days: 100.0 % 9 9 9
(360..) days: 100.0 % 28 28 6 22
Branch coverage date bins:
(30,360] days: 66.1 % 112 74 36 2 70 4
(360..) days: 65.2 % 290 189 2 99 2 187

 Age         Owner                    Branch data    TLA  Line data    Source code
                                  1                 :                : /*-------------------------------------------------------------------------
                                  2                 :                :  * launcher.c
                                  3                 :                :  *     PostgreSQL logical replication worker launcher process
                                  4                 :                :  *
                                  5                 :                :  * Copyright (c) 2016-2025, PostgreSQL Global Development Group
                                  6                 :                :  *
                                  7                 :                :  * IDENTIFICATION
                                  8                 :                :  *    src/backend/replication/logical/launcher.c
                                  9                 :                :  *
                                 10                 :                :  * NOTES
                                 11                 :                :  *    This module contains the logical replication worker launcher which
                                 12                 :                :  *    uses the background worker infrastructure to start the logical
                                 13                 :                :  *    replication workers for every enabled subscription.
                                 14                 :                :  *
                                 15                 :                :  *-------------------------------------------------------------------------
                                 16                 :                :  */
                                 17                 :                : 
                                 18                 :                : #include "postgres.h"
                                 19                 :                : 
                                 20                 :                : #include "access/heapam.h"
                                 21                 :                : #include "access/htup.h"
                                 22                 :                : #include "access/htup_details.h"
                                 23                 :                : #include "access/tableam.h"
                                 24                 :                : #include "access/xact.h"
                                 25                 :                : #include "catalog/pg_subscription.h"
                                 26                 :                : #include "catalog/pg_subscription_rel.h"
                                 27                 :                : #include "funcapi.h"
                                 28                 :                : #include "lib/dshash.h"
                                 29                 :                : #include "miscadmin.h"
                                 30                 :                : #include "pgstat.h"
                                 31                 :                : #include "postmaster/bgworker.h"
                                 32                 :                : #include "postmaster/interrupt.h"
                                 33                 :                : #include "replication/logicallauncher.h"
                                 34                 :                : #include "replication/origin.h"
                                 35                 :                : #include "replication/slot.h"
                                 36                 :                : #include "replication/walreceiver.h"
                                 37                 :                : #include "replication/worker_internal.h"
                                 38                 :                : #include "storage/ipc.h"
                                 39                 :                : #include "storage/proc.h"
                                 40                 :                : #include "storage/procarray.h"
                                 41                 :                : #include "tcop/tcopprot.h"
                                 42                 :                : #include "utils/builtins.h"
                                 43                 :                : #include "utils/memutils.h"
                                 44                 :                : #include "utils/pg_lsn.h"
                                 45                 :                : #include "utils/snapmgr.h"
                                 46                 :                : #include "utils/syscache.h"
                                 47                 :                : 
                                 48                 :                : /* max sleep time between cycles (3min) */
                                 49                 :                : #define DEFAULT_NAPTIME_PER_CYCLE 180000L
                                 50                 :                : 
                                 51                 :                : /* GUC variables */
                                 52                 :                : int         max_logical_replication_workers = 4;
                                 53                 :                : int         max_sync_workers_per_subscription = 2;
                                 54                 :                : int         max_parallel_apply_workers_per_subscription = 2;
                                 55                 :                : 
                                 56                 :                : LogicalRepWorker *MyLogicalRepWorker = NULL;
                                 57                 :                : 
                                 58                 :                : typedef struct LogicalRepCtxStruct
                                 59                 :                : {
                                 60                 :                :     /* Supervisor process. */
                                 61                 :                :     pid_t       launcher_pid;
                                 62                 :                : 
                                 63                 :                :     /* Hash table holding last start times of subscriptions' apply workers. */
                                 64                 :                :     dsa_handle  last_start_dsa;
                                 65                 :                :     dshash_table_handle last_start_dsh;
                                 66                 :                : 
                                 67                 :                :     /* Background workers. */
                                 68                 :                :     LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER];
                                 69                 :                : } LogicalRepCtxStruct;
                                 70                 :                : 
                                 71                 :                : static LogicalRepCtxStruct *LogicalRepCtx;
                                 72                 :                : 
                                 73                 :                : /* an entry in the last-start-times shared hash table */
                                 74                 :                : typedef struct LauncherLastStartTimesEntry
                                 75                 :                : {
                                 76                 :                :     Oid         subid;          /* OID of logrep subscription (hash key) */
                                 77                 :                :     TimestampTz last_start_time;    /* last time its apply worker was started */
                                 78                 :                : } LauncherLastStartTimesEntry;
                                 79                 :                : 
                                 80                 :                : /* parameters for the last-start-times shared hash table */
                                 81                 :                : static const dshash_parameters dsh_params = {
                                 82                 :                :     sizeof(Oid),
                                 83                 :                :     sizeof(LauncherLastStartTimesEntry),
                                 84                 :                :     dshash_memcmp,
                                 85                 :                :     dshash_memhash,
                                 86                 :                :     dshash_memcpy,
                                 87                 :                :     LWTRANCHE_LAUNCHER_HASH
                                 88                 :                : };
                                 89                 :                : 
                                 90                 :                : static dsa_area *last_start_times_dsa = NULL;
                                 91                 :                : static dshash_table *last_start_times = NULL;
                                 92                 :                : 
                                 93                 :                : static bool on_commit_launcher_wakeup = false;
                                 94                 :                : 
                                 95                 :                : 
                                 96                 :                : static void logicalrep_launcher_onexit(int code, Datum arg);
                                 97                 :                : static void logicalrep_worker_onexit(int code, Datum arg);
                                 98                 :                : static void logicalrep_worker_detach(void);
                                 99                 :                : static void logicalrep_worker_cleanup(LogicalRepWorker *worker);
                                100                 :                : static int  logicalrep_pa_worker_count(Oid subid);
                                101                 :                : static void logicalrep_launcher_attach_dshmem(void);
                                102                 :                : static void ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time);
                                103                 :                : static TimestampTz ApplyLauncherGetWorkerStartTime(Oid subid);
                                104                 :                : static void compute_min_nonremovable_xid(LogicalRepWorker *worker, TransactionId *xmin);
                                105                 :                : static bool acquire_conflict_slot_if_exists(void);
                                106                 :                : static void update_conflict_slot_xmin(TransactionId new_xmin);
                                107                 :                : static void init_conflict_slot_xmin(void);
                                108                 :                : 
                                109                 :                : 
                                110                 :                : /*
                                111                 :                :  * Load the list of subscriptions.
                                112                 :                :  *
                                113                 :                :  * Only the fields interesting for worker start/stop functions are filled for
                                114                 :                :  * each subscription.
                                115                 :                :  */
                                116                 :                : static List *
 3254 peter_e@gmx.net           117                 :CBC        2789 : get_subscription_list(void)
                                118                 :                : {
                                119                 :           2789 :     List       *res = NIL;
                                120                 :                :     Relation    rel;
                                121                 :                :     TableScanDesc scan;
                                122                 :                :     HeapTuple   tup;
                                123                 :                :     MemoryContext resultcxt;
                                124                 :                : 
                                125                 :                :     /* This is the context that we will allocate our output data in */
                                126                 :           2789 :     resultcxt = CurrentMemoryContext;
                                127                 :                : 
                                128                 :                :     /*
                                129                 :                :      * Start a transaction so we can access pg_subscription.
                                130                 :                :      */
                                131                 :           2789 :     StartTransactionCommand();
                                132                 :                : 
 2522 andres@anarazel.de        133                 :           2789 :     rel = table_open(SubscriptionRelationId, AccessShareLock);
 2473                           134                 :           2789 :     scan = table_beginscan_catalog(rel, 0, NULL);
                                135                 :                : 
 3254 peter_e@gmx.net           136         [ +  + ]:           3617 :     while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
                                137                 :                :     {
                                138                 :            828 :         Form_pg_subscription subform = (Form_pg_subscription) GETSTRUCT(tup);
                                139                 :                :         Subscription *sub;
                                140                 :                :         MemoryContext oldcxt;
                                141                 :                : 
                                142                 :                :         /*
                                143                 :                :          * Allocate our results in the caller's context, not the
                                144                 :                :          * transaction's. We do this inside the loop, and restore the original
                                145                 :                :          * context at the end, so that leaky things like heap_getnext() are
                                146                 :                :          * not called in a potentially long-lived context.
                                147                 :                :          */
                                148                 :            828 :         oldcxt = MemoryContextSwitchTo(resultcxt);
                                149                 :                : 
    7 michael@paquier.xyz       150                 :GNC         828 :         sub = palloc0_object(Subscription);
 2584 andres@anarazel.de        151                 :CBC         828 :         sub->oid = subform->oid;
 3254 peter_e@gmx.net           152                 :            828 :         sub->dbid = subform->subdbid;
                                153                 :            828 :         sub->owner = subform->subowner;
                                154                 :            828 :         sub->enabled = subform->subenabled;
                                155                 :            828 :         sub->name = pstrdup(NameStr(subform->subname));
  147 akapila@postgresql.o      156                 :GNC         828 :         sub->retaindeadtuples = subform->subretaindeadtuples;
  106                           157                 :            828 :         sub->retentionactive = subform->subretentionactive;
                                158                 :                :         /* We don't fill fields we are not interested in. */
                                159                 :                : 
 3254 peter_e@gmx.net           160                 :CBC         828 :         res = lappend(res, sub);
                                161                 :            828 :         MemoryContextSwitchTo(oldcxt);
                                162                 :                :     }
                                163                 :                : 
 2473 andres@anarazel.de        164                 :           2789 :     table_endscan(scan);
 2522                           165                 :           2789 :     table_close(rel, AccessShareLock);
                                166                 :                : 
 3254 peter_e@gmx.net           167                 :           2789 :     CommitTransactionCommand();
                                168                 :                : 
                                169                 :           2789 :     return res;
                                170                 :                : }
                                171                 :                : 
                                172                 :                : /*
                                173                 :                :  * Wait for a background worker to start up and attach to the shmem context.
                                174                 :                :  *
                                175                 :                :  * This is only needed for cleaning up the shared memory in case the worker
                                176                 :                :  * fails to attach.
                                177                 :                :  *
                                178                 :                :  * Returns whether the attach was successful.
                                179                 :                :  */
                                180                 :                : static bool
                                181                 :            418 : WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
                                182                 :                :                                uint16 generation,
                                183                 :                :                                BackgroundWorkerHandle *handle)
                                184                 :                : {
  176 tgl@sss.pgh.pa.us         185                 :            418 :     bool        result = false;
                                186                 :            418 :     bool        dropped_latch = false;
                                187                 :                : 
                                188                 :                :     for (;;)
 3254 peter_e@gmx.net           189                 :           1144 :     {
                                190                 :                :         BgwHandleStatus status;
                                191                 :                :         pid_t       pid;
                                192                 :                :         int         rc;
                                193                 :                : 
                                194         [ -  + ]:           1562 :         CHECK_FOR_INTERRUPTS();
                                195                 :                : 
 3157                           196                 :           1562 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
                                197                 :                : 
                                198                 :                :         /* Worker either died or has started. Return false if died. */
                                199   [ +  +  +  + ]:           1562 :         if (!worker->in_use || worker->proc)
                                200                 :                :         {
  176 tgl@sss.pgh.pa.us         201                 :            415 :             result = worker->in_use;
 3157 peter_e@gmx.net           202                 :            415 :             LWLockRelease(LogicalRepWorkerLock);
  176 tgl@sss.pgh.pa.us         203                 :            415 :             break;
                                204                 :                :         }
                                205                 :                : 
 3157 peter_e@gmx.net           206                 :           1147 :         LWLockRelease(LogicalRepWorkerLock);
                                207                 :                : 
                                208                 :                :         /* Check if worker has died before attaching, and clean up after it. */
 3254                           209                 :           1147 :         status = GetBackgroundWorkerPid(handle, &pid);
                                210                 :                : 
                                211         [ -  + ]:           1147 :         if (status == BGWH_STOPPED)
                                212                 :                :         {
 3157 peter_e@gmx.net           213                 :UBC           0 :             LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
                                214                 :                :             /* Ensure that this was indeed the worker we waited for. */
                                215         [ #  # ]:              0 :             if (generation == worker->generation)
                                216                 :              0 :                 logicalrep_worker_cleanup(worker);
                                217                 :              0 :             LWLockRelease(LogicalRepWorkerLock);
  176 tgl@sss.pgh.pa.us         218                 :              0 :             break;              /* result is already false */
                                219                 :                :         }
                                220                 :                : 
                                221                 :                :         /*
                                222                 :                :          * We need timeout because we generally don't get notified via latch
                                223                 :                :          * about the worker attach.  But we don't expect to have to wait long.
                                224                 :                :          */
 3254 peter_e@gmx.net           225                 :CBC        1147 :         rc = WaitLatch(MyLatch,
                                226                 :                :                        WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
                                227                 :                :                        10L, WAIT_EVENT_BGWORKER_STARTUP);
                                228                 :                : 
 3116 andres@anarazel.de        229         [ +  + ]:           1147 :         if (rc & WL_LATCH_SET)
                                230                 :                :         {
                                231                 :            478 :             ResetLatch(MyLatch);
                                232         [ +  + ]:            478 :             CHECK_FOR_INTERRUPTS();
  176 tgl@sss.pgh.pa.us         233                 :            475 :             dropped_latch = true;
                                234                 :                :         }
                                235                 :                :     }
                                236                 :                : 
                                237                 :                :     /*
                                238                 :                :      * If we had to clear a latch event in order to wait, be sure to restore
                                239                 :                :      * it before exiting.  Otherwise caller may miss events.
                                240                 :                :      */
                                241         [ +  - ]:            415 :     if (dropped_latch)
                                242                 :            415 :         SetLatch(MyLatch);
                                243                 :                : 
                                244                 :            415 :     return result;
                                245                 :                : }
                                246                 :                : 
                                247                 :                : /*
                                248                 :                :  * Walks the workers array and searches for one that matches given worker type,
                                249                 :                :  * subscription id, and relation id.
                                250                 :                :  *
                                251                 :                :  * For both apply workers and sequencesync workers, the relid should be set to
                                252                 :                :  * InvalidOid, as these workers handle changes across all tables and sequences
                                253                 :                :  * respectively, rather than targeting a specific relation. For tablesync
                                254                 :                :  * workers, the relid should be set to the OID of the relation being
                                255                 :                :  * synchronized.
                                256                 :                :  */
                                257                 :                : LogicalRepWorker *
   50 akapila@postgresql.o      258                 :GNC        3153 : logicalrep_worker_find(LogicalRepWorkerType wtype, Oid subid, Oid relid,
                                259                 :                :                        bool only_running)
                                260                 :                : {
                                261                 :                :     int         i;
 3136 bruce@momjian.us          262                 :CBC        3153 :     LogicalRepWorker *res = NULL;
                                263                 :                : 
                                264                 :                :     /* relid must be valid only for table sync workers */
   50 akapila@postgresql.o      265         [ -  + ]:GNC        3153 :     Assert((wtype == WORKERTYPE_TABLESYNC) == OidIsValid(relid));
 3254 peter_e@gmx.net           266         [ -  + ]:CBC        3153 :     Assert(LWLockHeldByMe(LogicalRepWorkerLock));
                                267                 :                : 
                                268                 :                :     /* Search for an attached worker that matches the specified criteria. */
                                269         [ +  + ]:           9680 :     for (i = 0; i < max_logical_replication_workers; i++)
                                270                 :                :     {
 3136 bruce@momjian.us          271                 :           8462 :         LogicalRepWorker *w = &LogicalRepCtx->workers[i];
                                272                 :                : 
                                273                 :                :         /* Skip parallel apply workers. */
 1073 akapila@postgresql.o      274   [ +  +  -  + ]:           8462 :         if (isParallelApplyWorker(w))
 1073 akapila@postgresql.o      275                 :UBC           0 :             continue;
                                276                 :                : 
 3157 peter_e@gmx.net           277   [ +  +  +  +  :CBC        8462 :         if (w->in_use && w->subid == subid && w->relid == relid &&
                                              +  + ]
   50 akapila@postgresql.o      278   [ +  +  +  +  :GNC        1963 :             w->type == wtype && (!only_running || w->proc))
                                              +  - ]
                                279                 :                :         {
 3254 peter_e@gmx.net           280                 :CBC        1935 :             res = w;
                                281                 :           1935 :             break;
                                282                 :                :         }
                                283                 :                :     }
                                284                 :                : 
                                285                 :           3153 :     return res;
                                286                 :                : }
                                287                 :                : 
                                288                 :                : /*
                                289                 :                :  * Similar to logicalrep_worker_find(), but returns a list of all workers for
                                290                 :                :  * the subscription, instead of just one.
                                291                 :                :  */
                                292                 :                : List *
  511 akapila@postgresql.o      293                 :            663 : logicalrep_workers_find(Oid subid, bool only_running, bool acquire_lock)
                                294                 :                : {
                                295                 :                :     int         i;
 3057 peter_e@gmx.net           296                 :            663 :     List       *res = NIL;
                                297                 :                : 
  511 akapila@postgresql.o      298         [ +  + ]:            663 :     if (acquire_lock)
                                299                 :            120 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
                                300                 :                : 
 3057 peter_e@gmx.net           301         [ -  + ]:            663 :     Assert(LWLockHeldByMe(LogicalRepWorkerLock));
                                302                 :                : 
                                303                 :                :     /* Search for attached worker for a given subscription id. */
                                304         [ +  + ]:           3443 :     for (i = 0; i < max_logical_replication_workers; i++)
                                305                 :                :     {
                                306                 :           2780 :         LogicalRepWorker *w = &LogicalRepCtx->workers[i];
                                307                 :                : 
                                308   [ +  +  +  +  :           2780 :         if (w->in_use && w->subid == subid && (!only_running || w->proc))
                                        +  +  +  + ]
                                309                 :            480 :             res = lappend(res, w);
                                310                 :                :     }
                                311                 :                : 
  511 akapila@postgresql.o      312         [ +  + ]:            663 :     if (acquire_lock)
                                313                 :            120 :         LWLockRelease(LogicalRepWorkerLock);
                                314                 :                : 
 3057 peter_e@gmx.net           315                 :            663 :     return res;
                                316                 :                : }
                                317                 :                : 
                                318                 :                : /*
                                319                 :                :  * Start new logical replication background worker, if possible.
                                320                 :                :  *
                                321                 :                :  * Returns true on success, false on failure.
                                322                 :                :  */
                                323                 :                : bool
  856 akapila@postgresql.o      324                 :            418 : logicalrep_worker_launch(LogicalRepWorkerType wtype,
                                325                 :                :                          Oid dbid, Oid subid, const char *subname, Oid userid,
                                326                 :                :                          Oid relid, dsm_handle subworker_dsm,
                                327                 :                :                          bool retain_dead_tuples)
                                328                 :                : {
                                329                 :                :     BackgroundWorker bgw;
                                330                 :                :     BackgroundWorkerHandle *bgw_handle;
                                331                 :                :     uint16      generation;
                                332                 :                :     int         i;
 3136 bruce@momjian.us          333                 :            418 :     int         slot = 0;
                                334                 :            418 :     LogicalRepWorker *worker = NULL;
                                335                 :                :     int         nsyncworkers;
                                336                 :                :     int         nparallelapplyworkers;
                                337                 :                :     TimestampTz now;
  856 akapila@postgresql.o      338                 :            418 :     bool        is_tablesync_worker = (wtype == WORKERTYPE_TABLESYNC);
   42 akapila@postgresql.o      339                 :GNC         418 :     bool        is_sequencesync_worker = (wtype == WORKERTYPE_SEQUENCESYNC);
  856 akapila@postgresql.o      340                 :CBC         418 :     bool        is_parallel_apply_worker = (wtype == WORKERTYPE_PARALLEL_APPLY);
                                341                 :                : 
                                342                 :                :     /*----------
                                343                 :                :      * Sanity checks:
                                344                 :                :      * - must be valid worker type
                                345                 :                :      * - tablesync workers are only ones to have relid
                                346                 :                :      * - parallel apply worker is the only kind of subworker
                                347                 :                :      * - The replication slot used in conflict detection is created when
                                348                 :                :      *   retain_dead_tuples is enabled
                                349                 :                :      */
                                350         [ -  + ]:            418 :     Assert(wtype != WORKERTYPE_UNKNOWN);
                                351         [ -  + ]:            418 :     Assert(is_tablesync_worker == OidIsValid(relid));
                                352         [ -  + ]:            418 :     Assert(is_parallel_apply_worker == (subworker_dsm != DSM_HANDLE_INVALID));
  147 akapila@postgresql.o      353   [ +  +  -  + ]:GNC         418 :     Assert(!retain_dead_tuples || MyReplicationSlot);
                                354                 :                : 
 3129 peter_e@gmx.net           355         [ +  + ]:CBC         418 :     ereport(DEBUG1,
                                356                 :                :             (errmsg_internal("starting logical replication worker for subscription \"%s\"",
                                357                 :                :                              subname)));
                                358                 :                : 
                                359                 :                :     /* Report this after the initial starting message for consistency. */
  271 msawada@postgresql.o      360         [ -  + ]:            418 :     if (max_active_replication_origins == 0)
 3254 peter_e@gmx.net           361         [ #  # ]:UBC           0 :         ereport(ERROR,
                                362                 :                :                 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
                                363                 :                :                  errmsg("cannot start logical replication workers when \"max_active_replication_origins\" is 0")));
                                364                 :                : 
                                365                 :                :     /*
                                366                 :                :      * We need to do the modification of the shared memory under lock so that
                                367                 :                :      * we have consistent view.
                                368                 :                :      */
 3254 peter_e@gmx.net           369                 :CBC         418 :     LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
                                370                 :                : 
 3157                           371                 :            418 : retry:
                                372                 :                :     /* Find unused worker slot. */
                                373         [ +  - ]:            736 :     for (i = 0; i < max_logical_replication_workers; i++)
                                374                 :                :     {
                                375                 :            736 :         LogicalRepWorker *w = &LogicalRepCtx->workers[i];
                                376                 :                : 
                                377         [ +  + ]:            736 :         if (!w->in_use)
                                378                 :                :         {
                                379                 :            418 :             worker = w;
                                380                 :            418 :             slot = i;
 3254                           381                 :            418 :             break;
                                382                 :                :         }
                                383                 :                :     }
                                384                 :                : 
 3157                           385                 :            418 :     nsyncworkers = logicalrep_sync_worker_count(subid);
                                386                 :                : 
                                387                 :            418 :     now = GetCurrentTimestamp();
                                388                 :                : 
                                389                 :                :     /*
                                390                 :                :      * If we didn't find a free slot, try to do garbage collection.  The
                                391                 :                :      * reason we do this is because if some worker failed to start up and its
                                392                 :                :      * parent has crashed while waiting, the in_use state was never cleared.
                                393                 :                :      */
                                394   [ +  -  -  + ]:            418 :     if (worker == NULL || nsyncworkers >= max_sync_workers_per_subscription)
                                395                 :                :     {
 3136 bruce@momjian.us          396                 :UBC           0 :         bool        did_cleanup = false;
                                397                 :                : 
 3157 peter_e@gmx.net           398         [ #  # ]:              0 :         for (i = 0; i < max_logical_replication_workers; i++)
                                399                 :                :         {
                                400                 :              0 :             LogicalRepWorker *w = &LogicalRepCtx->workers[i];
                                401                 :                : 
                                402                 :                :             /*
                                403                 :                :              * If the worker was marked in use but didn't manage to attach in
                                404                 :                :              * time, clean it up.
                                405                 :                :              */
                                406   [ #  #  #  #  :              0 :             if (w->in_use && !w->proc &&
                                              #  # ]
                                407                 :              0 :                 TimestampDifferenceExceeds(w->launch_time, now,
                                408                 :                :                                            wal_receiver_timeout))
                                409                 :                :             {
                                410         [ #  # ]:              0 :                 elog(WARNING,
                                411                 :                :                      "logical replication worker for subscription %u took too long to start; canceled",
                                412                 :                :                      w->subid);
                                413                 :                : 
                                414                 :              0 :                 logicalrep_worker_cleanup(w);
                                415                 :              0 :                 did_cleanup = true;
                                416                 :                :             }
                                417                 :                :         }
                                418                 :                : 
                                419         [ #  # ]:              0 :         if (did_cleanup)
                                420                 :              0 :             goto retry;
                                421                 :                :     }
                                422                 :                : 
                                423                 :                :     /*
                                424                 :                :      * We don't allow to invoke more sync workers once we have reached the
                                425                 :                :      * sync worker limit per subscription. So, just return silently as we
                                426                 :                :      * might get here because of an otherwise harmless race condition.
                                427                 :                :      */
   42 akapila@postgresql.o      428   [ +  +  +  + ]:GNC         418 :     if ((is_tablesync_worker || is_sequencesync_worker) &&
                                429         [ -  + ]:            207 :         nsyncworkers >= max_sync_workers_per_subscription)
                                430                 :                :     {
 3157 peter_e@gmx.net           431                 :UBC           0 :         LWLockRelease(LogicalRepWorkerLock);
 1073 akapila@postgresql.o      432                 :              0 :         return false;
                                433                 :                :     }
                                434                 :                : 
 1073 akapila@postgresql.o      435                 :CBC         418 :     nparallelapplyworkers = logicalrep_pa_worker_count(subid);
                                436                 :                : 
                                437                 :                :     /*
                                438                 :                :      * Return false if the number of parallel apply workers reached the limit
                                439                 :                :      * per subscription.
                                440                 :                :      */
                                441         [ +  + ]:            418 :     if (is_parallel_apply_worker &&
                                442         [ -  + ]:             10 :         nparallelapplyworkers >= max_parallel_apply_workers_per_subscription)
                                443                 :                :     {
 1073 akapila@postgresql.o      444                 :UBC           0 :         LWLockRelease(LogicalRepWorkerLock);
                                445                 :              0 :         return false;
                                446                 :                :     }
                                447                 :                : 
                                448                 :                :     /*
                                449                 :                :      * However if there are no more free worker slots, inform user about it
                                450                 :                :      * before exiting.
                                451                 :                :      */
 3254 peter_e@gmx.net           452         [ -  + ]:CBC         418 :     if (worker == NULL)
                                453                 :                :     {
 3249 fujii@postgresql.org      454                 :UBC           0 :         LWLockRelease(LogicalRepWorkerLock);
 3254 peter_e@gmx.net           455         [ #  # ]:              0 :         ereport(WARNING,
                                456                 :                :                 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
                                457                 :                :                  errmsg("out of logical replication worker slots"),
                                458                 :                :                  errhint("You might need to increase \"%s\".", "max_logical_replication_workers")));
 1073 akapila@postgresql.o      459                 :              0 :         return false;
                                460                 :                :     }
                                461                 :                : 
                                462                 :                :     /* Prepare the worker slot. */
  856 akapila@postgresql.o      463                 :CBC         418 :     worker->type = wtype;
 3157 peter_e@gmx.net           464                 :            418 :     worker->launch_time = now;
                                465                 :            418 :     worker->in_use = true;
                                466                 :            418 :     worker->generation++;
 3191                           467                 :            418 :     worker->proc = NULL;
 3254                           468                 :            418 :     worker->dbid = dbid;
                                469                 :            418 :     worker->userid = userid;
                                470                 :            418 :     worker->subid = subid;
 3191                           471                 :            418 :     worker->relid = relid;
                                472                 :            418 :     worker->relstate = SUBREL_STATE_UNKNOWN;
                                473                 :            418 :     worker->relstate_lsn = InvalidXLogRecPtr;
 1567 akapila@postgresql.o      474                 :            418 :     worker->stream_fileset = NULL;
 1064                           475         [ +  + ]:            418 :     worker->leader_pid = is_parallel_apply_worker ? MyProcPid : InvalidPid;
 1073                           476                 :            418 :     worker->parallel_apply = is_parallel_apply_worker;
  147 akapila@postgresql.o      477                 :GNC         418 :     worker->oldest_nonremovable_xid = retain_dead_tuples
                                478                 :              1 :         ? MyReplicationSlot->data.xmin
                                479         [ +  + ]:            418 :         : InvalidTransactionId;
 3191 peter_e@gmx.net           480                 :CBC         418 :     worker->last_lsn = InvalidXLogRecPtr;
                                481                 :            418 :     TIMESTAMP_NOBEGIN(worker->last_send_time);
                                482                 :            418 :     TIMESTAMP_NOBEGIN(worker->last_recv_time);
                                483                 :            418 :     worker->reply_lsn = InvalidXLogRecPtr;
                                484                 :            418 :     TIMESTAMP_NOBEGIN(worker->reply_time);
   42 akapila@postgresql.o      485                 :GNC         418 :     worker->last_seqsync_start_time = 0;
                                486                 :                : 
                                487                 :                :     /* Before releasing lock, remember generation for future identification. */
 3012 tgl@sss.pgh.pa.us         488                 :CBC         418 :     generation = worker->generation;
                                489                 :                : 
 3254 peter_e@gmx.net           490                 :            418 :     LWLockRelease(LogicalRepWorkerLock);
                                491                 :                : 
                                492                 :                :     /* Register the new dynamic worker. */
 3167 tgl@sss.pgh.pa.us         493                 :            418 :     memset(&bgw, 0, sizeof(bgw));
 3136 bruce@momjian.us          494                 :            418 :     bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
                                495                 :                :         BGWORKER_BACKEND_DATABASE_CONNECTION;
 3254 peter_e@gmx.net           496                 :            418 :     bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
  898 nathan@postgresql.or      497                 :            418 :     snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres");
                                498                 :                : 
  848 akapila@postgresql.o      499   [ +  +  +  +  :            418 :     switch (worker->type)
                                              -  - ]
                                500                 :                :     {
                                501                 :            201 :         case WORKERTYPE_APPLY:
                                502                 :            201 :             snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain");
                                503                 :            201 :             snprintf(bgw.bgw_name, BGW_MAXLEN,
                                504                 :                :                      "logical replication apply worker for subscription %u",
                                505                 :                :                      subid);
                                506                 :            201 :             snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication apply worker");
                                507                 :            201 :             break;
                                508                 :                : 
                                509                 :             10 :         case WORKERTYPE_PARALLEL_APPLY:
                                510                 :             10 :             snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ParallelApplyWorkerMain");
                                511                 :             10 :             snprintf(bgw.bgw_name, BGW_MAXLEN,
                                512                 :                :                      "logical replication parallel apply worker for subscription %u",
                                513                 :                :                      subid);
                                514                 :             10 :             snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication parallel worker");
                                515                 :                : 
                                516                 :             10 :             memcpy(bgw.bgw_extra, &subworker_dsm, sizeof(dsm_handle));
                                517                 :             10 :             break;
                                518                 :                : 
   42 akapila@postgresql.o      519                 :GNC           9 :         case WORKERTYPE_SEQUENCESYNC:
                                520                 :              9 :             snprintf(bgw.bgw_function_name, BGW_MAXLEN, "SequenceSyncWorkerMain");
                                521                 :              9 :             snprintf(bgw.bgw_name, BGW_MAXLEN,
                                522                 :                :                      "logical replication sequencesync worker for subscription %u",
                                523                 :                :                      subid);
                                524                 :              9 :             snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication sequencesync worker");
                                525                 :              9 :             break;
                                526                 :                : 
  848 akapila@postgresql.o      527                 :CBC         198 :         case WORKERTYPE_TABLESYNC:
   42 akapila@postgresql.o      528                 :GNC         198 :             snprintf(bgw.bgw_function_name, BGW_MAXLEN, "TableSyncWorkerMain");
  848 akapila@postgresql.o      529                 :CBC         198 :             snprintf(bgw.bgw_name, BGW_MAXLEN,
                                530                 :                :                      "logical replication tablesync worker for subscription %u sync %u",
                                531                 :                :                      subid,
                                532                 :                :                      relid);
                                533                 :            198 :             snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication tablesync worker");
                                534                 :            198 :             break;
                                535                 :                : 
  848 akapila@postgresql.o      536                 :UBC           0 :         case WORKERTYPE_UNKNOWN:
                                537                 :                :             /* Should never happen. */
                                538         [ #  # ]:              0 :             elog(ERROR, "unknown worker type");
                                539                 :                :     }
                                540                 :                : 
 3254 peter_e@gmx.net           541                 :CBC         418 :     bgw.bgw_restart_time = BGW_NEVER_RESTART;
                                542                 :            418 :     bgw.bgw_notify_pid = MyProcPid;
 3164 fujii@postgresql.org      543                 :            418 :     bgw.bgw_main_arg = Int32GetDatum(slot);
                                544                 :                : 
 3254 peter_e@gmx.net           545         [ -  + ]:            418 :     if (!RegisterDynamicBackgroundWorker(&bgw, &bgw_handle))
                                546                 :                :     {
                                547                 :                :         /* Failed to start worker, so clean up the worker slot. */
 3012 tgl@sss.pgh.pa.us         548                 :UBC           0 :         LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
                                549         [ #  # ]:              0 :         Assert(generation == worker->generation);
                                550                 :              0 :         logicalrep_worker_cleanup(worker);
                                551                 :              0 :         LWLockRelease(LogicalRepWorkerLock);
                                552                 :                : 
 3254 peter_e@gmx.net           553         [ #  # ]:              0 :         ereport(WARNING,
                                554                 :                :                 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
                                555                 :                :                  errmsg("out of background worker slots"),
                                556                 :                :                  errhint("You might need to increase \"%s\".", "max_worker_processes")));
 1073 akapila@postgresql.o      557                 :              0 :         return false;
                                558                 :                :     }
                                559                 :                : 
                                560                 :                :     /* Now wait until it attaches. */
 1073 akapila@postgresql.o      561                 :CBC         418 :     return WaitForReplicationWorkerAttach(worker, generation, bgw_handle);
                                562                 :                : }
                                563                 :                : 
                                564                 :                : /*
                                565                 :                :  * Internal function to stop the worker and wait until it detaches from the
                                566                 :                :  * slot.
                                567                 :                :  */
                                568                 :                : static void
                                569                 :             84 : logicalrep_worker_stop_internal(LogicalRepWorker *worker, int signo)
                                570                 :                : {
                                571                 :                :     uint16      generation;
                                572                 :                : 
                                573         [ -  + ]:             84 :     Assert(LWLockHeldByMeInMode(LogicalRepWorkerLock, LW_SHARED));
                                574                 :                : 
                                575                 :                :     /*
                                576                 :                :      * Remember which generation was our worker so we can check if what we see
                                577                 :                :      * is still the same one.
                                578                 :                :      */
 3157 peter_e@gmx.net           579                 :             84 :     generation = worker->generation;
                                580                 :                : 
                                581                 :                :     /*
                                582                 :                :      * If we found a worker but it does not have proc set then it is still
                                583                 :                :      * starting up; wait for it to finish starting and then kill it.
                                584                 :                :      */
                                585   [ +  -  +  + ]:             84 :     while (worker->in_use && !worker->proc)
                                586                 :                :     {
                                587                 :                :         int         rc;
                                588                 :                : 
 3254                           589                 :              1 :         LWLockRelease(LogicalRepWorkerLock);
                                590                 :                : 
                                591                 :                :         /* Wait a bit --- we don't expect to have to wait long. */
 3116 andres@anarazel.de        592                 :              1 :         rc = WaitLatch(MyLatch,
                                593                 :                :                        WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
                                594                 :                :                        10L, WAIT_EVENT_BGWORKER_STARTUP);
                                595                 :                : 
                                596         [ -  + ]:              1 :         if (rc & WL_LATCH_SET)
                                597                 :                :         {
 3116 andres@anarazel.de        598                 :UBC           0 :             ResetLatch(MyLatch);
                                599         [ #  # ]:              0 :             CHECK_FOR_INTERRUPTS();
                                600                 :                :         }
                                601                 :                : 
                                602                 :                :         /* Recheck worker status. */
 3254 peter_e@gmx.net           603                 :CBC           1 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
                                604                 :                : 
                                605                 :                :         /*
                                606                 :                :          * Check whether the worker slot is no longer used, which would mean
                                607                 :                :          * that the worker has exited, or whether the worker generation is
                                608                 :                :          * different, meaning that a different worker has taken the slot.
                                609                 :                :          */
 3157                           610   [ +  -  -  + ]:              1 :         if (!worker->in_use || worker->generation != generation)
 3250 peter_e@gmx.net           611                 :UBC           0 :             return;
                                612                 :                : 
                                613                 :                :         /* Worker has assigned proc, so it has started. */
 3250 peter_e@gmx.net           614         [ +  - ]:CBC           1 :         if (worker->proc)
 3254                           615                 :              1 :             break;
                                616                 :                :     }
                                617                 :                : 
                                618                 :                :     /* Now terminate the worker ... */
 1073 akapila@postgresql.o      619                 :             84 :     kill(worker->proc->pid, signo);
                                620                 :                : 
                                621                 :                :     /* ... and wait for it to die. */
                                622                 :                :     for (;;)
 3254 peter_e@gmx.net           623                 :            112 :     {
                                624                 :                :         int         rc;
                                625                 :                : 
                                626                 :                :         /* is it gone? */
 3157                           627   [ +  +  +  + ]:            196 :         if (!worker->proc || worker->generation != generation)
                                628                 :                :             break;
                                629                 :                : 
 3091 tgl@sss.pgh.pa.us         630                 :            112 :         LWLockRelease(LogicalRepWorkerLock);
                                631                 :                : 
                                632                 :                :         /* Wait a bit --- we don't expect to have to wait long. */
 3116 andres@anarazel.de        633                 :            112 :         rc = WaitLatch(MyLatch,
                                634                 :                :                        WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
                                635                 :                :                        10L, WAIT_EVENT_BGWORKER_SHUTDOWN);
                                636                 :                : 
                                637         [ +  + ]:            112 :         if (rc & WL_LATCH_SET)
                                638                 :                :         {
                                639                 :             36 :             ResetLatch(MyLatch);
                                640         [ +  + ]:             36 :             CHECK_FOR_INTERRUPTS();
                                641                 :                :         }
                                642                 :                : 
 3091 tgl@sss.pgh.pa.us         643                 :            112 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
                                644                 :                :     }
                                645                 :                : }
                                646                 :                : 
                                647                 :                : /*
                                648                 :                :  * Stop the logical replication worker that matches the specified worker type,
                                649                 :                :  * subscription id, and relation id.
                                650                 :                :  */
                                651                 :                : void
   50 akapila@postgresql.o      652                 :GNC          97 : logicalrep_worker_stop(LogicalRepWorkerType wtype, Oid subid, Oid relid)
                                653                 :                : {
                                654                 :                :     LogicalRepWorker *worker;
                                655                 :                : 
                                656                 :                :     /* relid must be valid only for table sync workers */
                                657         [ -  + ]:             97 :     Assert((wtype == WORKERTYPE_TABLESYNC) == OidIsValid(relid));
                                658                 :                : 
 1073 akapila@postgresql.o      659                 :CBC          97 :     LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
                                660                 :                : 
   50 akapila@postgresql.o      661                 :GNC          97 :     worker = logicalrep_worker_find(wtype, subid, relid, false);
                                662                 :                : 
 1073 akapila@postgresql.o      663         [ +  + ]:CBC          97 :     if (worker)
                                664                 :                :     {
                                665   [ +  -  -  + ]:             76 :         Assert(!isParallelApplyWorker(worker));
                                666                 :             76 :         logicalrep_worker_stop_internal(worker, SIGTERM);
                                667                 :                :     }
                                668                 :                : 
                                669                 :             97 :     LWLockRelease(LogicalRepWorkerLock);
                                670                 :             97 : }
                                671                 :                : 
                                672                 :                : /*
                                673                 :                :  * Stop the given logical replication parallel apply worker.
                                674                 :                :  *
                                675                 :                :  * Node that the function sends SIGUSR2 instead of SIGTERM to the parallel apply
                                676                 :                :  * worker so that the worker exits cleanly.
                                677                 :                :  */
                                678                 :                : void
  953                           679                 :              5 : logicalrep_pa_worker_stop(ParallelApplyWorkerInfo *winfo)
                                680                 :                : {
                                681                 :                :     int         slot_no;
                                682                 :                :     uint16      generation;
                                683                 :                :     LogicalRepWorker *worker;
                                684                 :                : 
                                685         [ -  + ]:              5 :     SpinLockAcquire(&winfo->shared->mutex);
                                686                 :              5 :     generation = winfo->shared->logicalrep_worker_generation;
                                687                 :              5 :     slot_no = winfo->shared->logicalrep_worker_slot_no;
                                688                 :              5 :     SpinLockRelease(&winfo->shared->mutex);
                                689                 :                : 
 1073                           690   [ +  -  -  + ]:              5 :     Assert(slot_no >= 0 && slot_no < max_logical_replication_workers);
                                691                 :                : 
                                692                 :                :     /*
                                693                 :                :      * Detach from the error_mq_handle for the parallel apply worker before
                                694                 :                :      * stopping it. This prevents the leader apply worker from trying to
                                695                 :                :      * receive the message from the error queue that might already be detached
                                696                 :                :      * by the parallel apply worker.
                                697                 :                :      */
  953                           698         [ +  - ]:              5 :     if (winfo->error_mq_handle)
                                699                 :                :     {
                                700                 :              5 :         shm_mq_detach(winfo->error_mq_handle);
                                701                 :              5 :         winfo->error_mq_handle = NULL;
                                702                 :                :     }
                                703                 :                : 
 1073                           704                 :              5 :     LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
                                705                 :                : 
                                706                 :              5 :     worker = &LogicalRepCtx->workers[slot_no];
                                707   [ +  -  -  + ]:              5 :     Assert(isParallelApplyWorker(worker));
                                708                 :                : 
                                709                 :                :     /*
                                710                 :                :      * Only stop the worker if the generation matches and the worker is alive.
                                711                 :                :      */
                                712   [ +  -  +  - ]:              5 :     if (worker->generation == generation && worker->proc)
   84                           713                 :              5 :         logicalrep_worker_stop_internal(worker, SIGUSR2);
                                714                 :                : 
 3091 tgl@sss.pgh.pa.us         715                 :              5 :     LWLockRelease(LogicalRepWorkerLock);
 3254 peter_e@gmx.net           716                 :              5 : }
                                717                 :                : 
                                718                 :                : /*
                                719                 :                :  * Wake up (using latch) any logical replication worker that matches the
                                720                 :                :  * specified worker type, subscription id, and relation id.
                                721                 :                :  */
                                722                 :                : void
   50 akapila@postgresql.o      723                 :GNC         212 : logicalrep_worker_wakeup(LogicalRepWorkerType wtype, Oid subid, Oid relid)
                                724                 :                : {
                                725                 :                :     LogicalRepWorker *worker;
                                726                 :                : 
                                727                 :                :     /* relid must be valid only for table sync workers */
                                728         [ -  + ]:            212 :     Assert((wtype == WORKERTYPE_TABLESYNC) == OidIsValid(relid));
                                729                 :                : 
 3191 peter_e@gmx.net           730                 :CBC         212 :     LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
                                731                 :                : 
   50 akapila@postgresql.o      732                 :GNC         212 :     worker = logicalrep_worker_find(wtype, subid, relid, true);
                                733                 :                : 
 3191 peter_e@gmx.net           734         [ +  - ]:CBC         212 :     if (worker)
                                735                 :            212 :         logicalrep_worker_wakeup_ptr(worker);
                                736                 :                : 
 3092 tgl@sss.pgh.pa.us         737                 :            212 :     LWLockRelease(LogicalRepWorkerLock);
 3191 peter_e@gmx.net           738                 :            212 : }
                                739                 :                : 
                                740                 :                : /*
                                741                 :                :  * Wake up (using latch) the specified logical replication worker.
                                742                 :                :  *
                                743                 :                :  * Caller must hold lock, else worker->proc could change under us.
                                744                 :                :  */
                                745                 :                : void
                                746                 :            653 : logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker)
                                747                 :                : {
 3092 tgl@sss.pgh.pa.us         748         [ -  + ]:            653 :     Assert(LWLockHeldByMe(LogicalRepWorkerLock));
                                749                 :                : 
 3191 peter_e@gmx.net           750                 :            653 :     SetLatch(&worker->proc->procLatch);
                                751                 :            653 : }
                                752                 :                : 
                                753                 :                : /*
                                754                 :                :  * Attach to a slot.
                                755                 :                :  */
                                756                 :                : void
 3254                           757                 :            544 : logicalrep_worker_attach(int slot)
                                758                 :                : {
                                759                 :                :     /* Block concurrent access. */
                                760                 :            544 :     LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
                                761                 :                : 
                                762   [ +  -  -  + ]:            544 :     Assert(slot >= 0 && slot < max_logical_replication_workers);
                                763                 :            544 :     MyLogicalRepWorker = &LogicalRepCtx->workers[slot];
                                764                 :                : 
 3157                           765         [ -  + ]:            544 :     if (!MyLogicalRepWorker->in_use)
                                766                 :                :     {
 3157 peter_e@gmx.net           767                 :UBC           0 :         LWLockRelease(LogicalRepWorkerLock);
                                768         [ #  # ]:              0 :         ereport(ERROR,
                                769                 :                :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                                770                 :                :                  errmsg("logical replication worker slot %d is empty, cannot attach",
                                771                 :                :                         slot)));
                                772                 :                :     }
                                773                 :                : 
 3254 peter_e@gmx.net           774         [ -  + ]:CBC         544 :     if (MyLogicalRepWorker->proc)
                                775                 :                :     {
 3157 peter_e@gmx.net           776                 :UBC           0 :         LWLockRelease(LogicalRepWorkerLock);
 3254                           777         [ #  # ]:              0 :         ereport(ERROR,
                                778                 :                :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                                779                 :                :                  errmsg("logical replication worker slot %d is already used by "
                                780                 :                :                         "another worker, cannot attach", slot)));
                                781                 :                :     }
                                782                 :                : 
 3254 peter_e@gmx.net           783                 :CBC         544 :     MyLogicalRepWorker->proc = MyProc;
                                784                 :            544 :     before_shmem_exit(logicalrep_worker_onexit, (Datum) 0);
                                785                 :                : 
                                786                 :            544 :     LWLockRelease(LogicalRepWorkerLock);
                                787                 :            544 : }
                                788                 :                : 
                                789                 :                : /*
                                790                 :                :  * Stop the parallel apply workers if any, and detach the leader apply worker
                                791                 :                :  * (cleans up the worker info).
                                792                 :                :  */
                                793                 :                : static void
                                794                 :            544 : logicalrep_worker_detach(void)
                                795                 :                : {
                                796                 :                :     /* Stop the parallel apply workers. */
 1073 akapila@postgresql.o      797         [ +  + ]:            544 :     if (am_leader_apply_worker())
                                798                 :                :     {
                                799                 :                :         List       *workers;
                                800                 :                :         ListCell   *lc;
                                801                 :                : 
                                802                 :                :         /*
                                803                 :                :          * Detach from the error_mq_handle for all parallel apply workers
                                804                 :                :          * before terminating them. This prevents the leader apply worker from
                                805                 :                :          * receiving the worker termination message and sending it to logs
                                806                 :                :          * when the same is already done by the parallel worker.
                                807                 :                :          */
                                808                 :            325 :         pa_detach_all_error_mq();
                                809                 :                : 
                                810                 :            325 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
                                811                 :                : 
  511                           812                 :            325 :         workers = logicalrep_workers_find(MyLogicalRepWorker->subid, true, false);
 1073                           813   [ +  -  +  +  :            656 :         foreach(lc, workers)
                                              +  + ]
                                814                 :                :         {
                                815                 :            331 :             LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
                                816                 :                : 
                                817   [ +  -  +  + ]:            331 :             if (isParallelApplyWorker(w))
                                818                 :              3 :                 logicalrep_worker_stop_internal(w, SIGTERM);
                                819                 :                :         }
                                820                 :                : 
                                821                 :            325 :         LWLockRelease(LogicalRepWorkerLock);
                                822                 :                : 
  137 tgl@sss.pgh.pa.us         823                 :GNC         325 :         list_free(workers);
                                824                 :                :     }
                                825                 :                : 
                                826                 :                :     /* Block concurrent access. */
 3254 peter_e@gmx.net           827                 :CBC         544 :     LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
                                828                 :                : 
 3157                           829                 :            544 :     logicalrep_worker_cleanup(MyLogicalRepWorker);
                                830                 :                : 
 3254                           831                 :            544 :     LWLockRelease(LogicalRepWorkerLock);
                                832                 :            544 : }
                                833                 :                : 
                                834                 :                : /*
                                835                 :                :  * Clean up worker info.
                                836                 :                :  */
                                837                 :                : static void
 3157                           838                 :            544 : logicalrep_worker_cleanup(LogicalRepWorker *worker)
                                839                 :                : {
                                840         [ -  + ]:            544 :     Assert(LWLockHeldByMeInMode(LogicalRepWorkerLock, LW_EXCLUSIVE));
                                841                 :                : 
  845 akapila@postgresql.o      842                 :            544 :     worker->type = WORKERTYPE_UNKNOWN;
 3157 peter_e@gmx.net           843                 :            544 :     worker->in_use = false;
                                844                 :            544 :     worker->proc = NULL;
                                845                 :            544 :     worker->dbid = InvalidOid;
                                846                 :            544 :     worker->userid = InvalidOid;
                                847                 :            544 :     worker->subid = InvalidOid;
                                848                 :            544 :     worker->relid = InvalidOid;
 1064 akapila@postgresql.o      849                 :            544 :     worker->leader_pid = InvalidPid;
 1073                           850                 :            544 :     worker->parallel_apply = false;
 3157 peter_e@gmx.net           851                 :            544 : }
                                852                 :                : 
                                853                 :                : /*
                                854                 :                :  * Cleanup function for logical replication launcher.
                                855                 :                :  *
                                856                 :                :  * Called on logical replication launcher exit.
                                857                 :                :  */
                                858                 :                : static void
 3164 fujii@postgresql.org      859                 :            413 : logicalrep_launcher_onexit(int code, Datum arg)
                                860                 :                : {
                                861                 :            413 :     LogicalRepCtx->launcher_pid = 0;
                                862                 :            413 : }
                                863                 :                : 
                                864                 :                : /*
                                865                 :                :  * Reset the last_seqsync_start_time of the sequencesync worker in the
                                866                 :                :  * subscription's apply worker.
                                867                 :                :  *
                                868                 :                :  * Note that this value is not stored in the sequencesync worker, because that
                                869                 :                :  * has finished already and is about to exit.
                                870                 :                :  */
                                871                 :                : void
   42 akapila@postgresql.o      872                 :GNC           5 : logicalrep_reset_seqsync_start_time(void)
                                873                 :                : {
                                874                 :                :     LogicalRepWorker *worker;
                                875                 :                : 
                                876                 :                :     /*
                                877                 :                :      * The apply worker can't access last_seqsync_start_time concurrently, so
                                878                 :                :      * it is okay to use SHARED lock here. See ProcessSequencesForSync().
                                879                 :                :      */
                                880                 :              5 :     LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
                                881                 :                : 
                                882                 :              5 :     worker = logicalrep_worker_find(WORKERTYPE_APPLY,
                                883                 :              5 :                                     MyLogicalRepWorker->subid, InvalidOid,
                                884                 :                :                                     true);
                                885         [ +  - ]:              5 :     if (worker)
                                886                 :              5 :         worker->last_seqsync_start_time = 0;
                                887                 :                : 
                                888                 :              5 :     LWLockRelease(LogicalRepWorkerLock);
                                889                 :              5 : }
                                890                 :                : 
                                891                 :                : /*
                                892                 :                :  * Cleanup function.
                                893                 :                :  *
                                894                 :                :  * Called on logical replication worker exit.
                                895                 :                :  */
                                896                 :                : static void
 3254 peter_e@gmx.net           897                 :CBC         544 : logicalrep_worker_onexit(int code, Datum arg)
                                898                 :                : {
                                899                 :                :     /* Disconnect gracefully from the remote side. */
 1680 alvherre@alvh.no-ip.      900         [ +  + ]:            544 :     if (LogRepWorkerWalRcvConn)
                                901                 :            431 :         walrcv_disconnect(LogRepWorkerWalRcvConn);
                                902                 :                : 
 3254 peter_e@gmx.net           903                 :            544 :     logicalrep_worker_detach();
                                904                 :                : 
                                905                 :                :     /* Cleanup fileset used for streaming transactions. */
 1567 akapila@postgresql.o      906         [ +  + ]:            544 :     if (MyLogicalRepWorker->stream_fileset != NULL)
                                907                 :             14 :         FileSetDeleteAll(MyLogicalRepWorker->stream_fileset);
                                908                 :                : 
                                909                 :                :     /*
                                910                 :                :      * Session level locks may be acquired outside of a transaction in
                                911                 :                :      * parallel apply mode and will not be released when the worker
                                912                 :                :      * terminates, so manually release all locks before the worker exits.
                                913                 :                :      *
                                914                 :                :      * The locks will be acquired once the worker is initialized.
                                915                 :                :      */
  959                           916         [ +  + ]:            544 :     if (!InitializingApplyWorker)
                                917                 :            479 :         LockReleaseAll(DEFAULT_LOCKMETHOD, true);
                                918                 :                : 
 3121 peter_e@gmx.net           919                 :            544 :     ApplyLauncherWakeup();
 3254                           920                 :            544 : }
                                921                 :                : 
                                922                 :                : /*
                                923                 :                :  * Count the number of registered (not necessarily running) sync workers
                                924                 :                :  * for a subscription.
                                925                 :                :  */
                                926                 :                : int
 3191                           927                 :           1327 : logicalrep_sync_worker_count(Oid subid)
                                928                 :                : {
                                929                 :                :     int         i;
 3136 bruce@momjian.us          930                 :           1327 :     int         res = 0;
                                931                 :                : 
 3191 peter_e@gmx.net           932         [ -  + ]:           1327 :     Assert(LWLockHeldByMe(LogicalRepWorkerLock));
                                933                 :                : 
                                934                 :                :     /* Search for attached worker for a given subscription id. */
                                935         [ +  + ]:           6841 :     for (i = 0; i < max_logical_replication_workers; i++)
                                936                 :                :     {
 3136 bruce@momjian.us          937                 :           5514 :         LogicalRepWorker *w = &LogicalRepCtx->workers[i];
                                938                 :                : 
   42 akapila@postgresql.o      939   [ +  +  +  -  :GNC        5514 :         if (w->subid == subid && (isTableSyncWorker(w) || isSequenceSyncWorker(w)))
                                     +  +  +  -  -  
                                                 + ]
 3191 peter_e@gmx.net           940                 :CBC        1510 :             res++;
                                941                 :                :     }
                                942                 :                : 
                                943                 :           1327 :     return res;
                                944                 :                : }
                                945                 :                : 
                                946                 :                : /*
                                947                 :                :  * Count the number of registered (but not necessarily running) parallel apply
                                948                 :                :  * workers for a subscription.
                                949                 :                :  */
                                950                 :                : static int
 1073 akapila@postgresql.o      951                 :            418 : logicalrep_pa_worker_count(Oid subid)
                                952                 :                : {
                                953                 :                :     int         i;
                                954                 :            418 :     int         res = 0;
                                955                 :                : 
                                956         [ -  + ]:            418 :     Assert(LWLockHeldByMe(LogicalRepWorkerLock));
                                957                 :                : 
                                958                 :                :     /*
                                959                 :                :      * Scan all attached parallel apply workers, only counting those which
                                960                 :                :      * have the given subscription id.
                                961                 :                :      */
                                962         [ +  + ]:           2214 :     for (i = 0; i < max_logical_replication_workers; i++)
                                963                 :                :     {
                                964                 :           1796 :         LogicalRepWorker *w = &LogicalRepCtx->workers[i];
                                965                 :                : 
  845                           966   [ +  +  +  +  :           1796 :         if (isParallelApplyWorker(w) && w->subid == subid)
                                              +  - ]
 1073                           967                 :              2 :             res++;
                                968                 :                :     }
                                969                 :                : 
                                970                 :            418 :     return res;
                                971                 :                : }
                                972                 :                : 
                                973                 :                : /*
                                974                 :                :  * ApplyLauncherShmemSize
                                975                 :                :  *      Compute space needed for replication launcher shared memory
                                976                 :                :  */
                                977                 :                : Size
 3254 peter_e@gmx.net           978                 :           4124 : ApplyLauncherShmemSize(void)
                                979                 :                : {
                                980                 :                :     Size        size;
                                981                 :                : 
                                982                 :                :     /*
                                983                 :                :      * Need the fixed struct and the array of LogicalRepWorker.
                                984                 :                :      */
                                985                 :           4124 :     size = sizeof(LogicalRepCtxStruct);
                                986                 :           4124 :     size = MAXALIGN(size);
                                987                 :           4124 :     size = add_size(size, mul_size(max_logical_replication_workers,
                                988                 :                :                                    sizeof(LogicalRepWorker)));
                                989                 :           4124 :     return size;
                                990                 :                : }
                                991                 :                : 
                                992                 :                : /*
                                993                 :                :  * ApplyLauncherRegister
                                994                 :                :  *      Register a background worker running the logical replication launcher.
                                995                 :                :  */
                                996                 :                : void
                                997                 :            849 : ApplyLauncherRegister(void)
                                998                 :                : {
                                999                 :                :     BackgroundWorker bgw;
                               1000                 :                : 
                               1001                 :                :     /*
                               1002                 :                :      * The logical replication launcher is disabled during binary upgrades, to
                               1003                 :                :      * prevent logical replication workers from running on the source cluster.
                               1004                 :                :      * That could cause replication origins to move forward after having been
                               1005                 :                :      * copied to the target cluster, potentially creating conflicts with the
                               1006                 :                :      * copied data files.
                               1007                 :                :      */
  705 michael@paquier.xyz      1008   [ +  +  +  + ]:            849 :     if (max_logical_replication_workers == 0 || IsBinaryUpgrade)
 3254 peter_e@gmx.net          1009                 :             57 :         return;
                               1010                 :                : 
 3167 tgl@sss.pgh.pa.us        1011                 :            792 :     memset(&bgw, 0, sizeof(bgw));
 3136 bruce@momjian.us         1012                 :            792 :     bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
                               1013                 :                :         BGWORKER_BACKEND_DATABASE_CONNECTION;
 3254 peter_e@gmx.net          1014                 :            792 :     bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
  898 nathan@postgresql.or     1015                 :            792 :     snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres");
 3183 rhaas@postgresql.org     1016                 :            792 :     snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyLauncherMain");
 3254 peter_e@gmx.net          1017                 :            792 :     snprintf(bgw.bgw_name, BGW_MAXLEN,
                               1018                 :                :              "logical replication launcher");
 3030                          1019                 :            792 :     snprintf(bgw.bgw_type, BGW_MAXLEN,
                               1020                 :                :              "logical replication launcher");
 3254                          1021                 :            792 :     bgw.bgw_restart_time = 5;
                               1022                 :            792 :     bgw.bgw_notify_pid = 0;
                               1023                 :            792 :     bgw.bgw_main_arg = (Datum) 0;
                               1024                 :                : 
                               1025                 :            792 :     RegisterBackgroundWorker(&bgw);
                               1026                 :                : }
                               1027                 :                : 
                               1028                 :                : /*
                               1029                 :                :  * ApplyLauncherShmemInit
                               1030                 :                :  *      Allocate and initialize replication launcher shared memory
                               1031                 :                :  */
                               1032                 :                : void
                               1033                 :           1069 : ApplyLauncherShmemInit(void)
                               1034                 :                : {
                               1035                 :                :     bool        found;
                               1036                 :                : 
                               1037                 :           1069 :     LogicalRepCtx = (LogicalRepCtxStruct *)
                               1038                 :           1069 :         ShmemInitStruct("Logical Replication Launcher Data",
                               1039                 :                :                         ApplyLauncherShmemSize(),
                               1040                 :                :                         &found);
                               1041                 :                : 
                               1042         [ +  - ]:           1069 :     if (!found)
                               1043                 :                :     {
                               1044                 :                :         int         slot;
                               1045                 :                : 
                               1046                 :           1069 :         memset(LogicalRepCtx, 0, ApplyLauncherShmemSize());
                               1047                 :                : 
 1057 tgl@sss.pgh.pa.us        1048                 :           1069 :         LogicalRepCtx->last_start_dsa = DSA_HANDLE_INVALID;
                               1049                 :           1069 :         LogicalRepCtx->last_start_dsh = DSHASH_HANDLE_INVALID;
                               1050                 :                : 
                               1051                 :                :         /* Initialize memory and spin locks for each worker slot. */
 3191 peter_e@gmx.net          1052         [ +  + ]:           5308 :         for (slot = 0; slot < max_logical_replication_workers; slot++)
                               1053                 :                :         {
                               1054                 :           4239 :             LogicalRepWorker *worker = &LogicalRepCtx->workers[slot];
                               1055                 :                : 
                               1056                 :           4239 :             memset(worker, 0, sizeof(LogicalRepWorker));
                               1057                 :           4239 :             SpinLockInit(&worker->relmutex);
                               1058                 :                :         }
                               1059                 :                :     }
 3254                          1060                 :           1069 : }
                               1061                 :                : 
                               1062                 :                : /*
                               1063                 :                :  * Initialize or attach to the dynamic shared hash table that stores the
                               1064                 :                :  * last-start times, if not already done.
                               1065                 :                :  * This must be called before accessing the table.
                               1066                 :                :  */
                               1067                 :                : static void
 1060 tgl@sss.pgh.pa.us        1068                 :            720 : logicalrep_launcher_attach_dshmem(void)
                               1069                 :                : {
                               1070                 :                :     MemoryContext oldcontext;
                               1071                 :                : 
                               1072                 :                :     /* Quick exit if we already did this. */
 1057                          1073         [ +  + ]:            720 :     if (LogicalRepCtx->last_start_dsh != DSHASH_HANDLE_INVALID &&
 1060                          1074         [ +  + ]:            667 :         last_start_times != NULL)
                               1075                 :            475 :         return;
                               1076                 :                : 
                               1077                 :                :     /* Otherwise, use a lock to ensure only one process creates the table. */
                               1078                 :            245 :     LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
                               1079                 :                : 
                               1080                 :                :     /* Be sure any local memory allocated by DSA routines is persistent. */
                               1081                 :            245 :     oldcontext = MemoryContextSwitchTo(TopMemoryContext);
                               1082                 :                : 
 1057                          1083         [ +  + ]:            245 :     if (LogicalRepCtx->last_start_dsh == DSHASH_HANDLE_INVALID)
                               1084                 :                :     {
                               1085                 :                :         /* Initialize dynamic shared hash table for last-start times. */
 1060                          1086                 :             53 :         last_start_times_dsa = dsa_create(LWTRANCHE_LAUNCHER_DSA);
                               1087                 :             53 :         dsa_pin(last_start_times_dsa);
                               1088                 :             53 :         dsa_pin_mapping(last_start_times_dsa);
  660 nathan@postgresql.or     1089                 :             53 :         last_start_times = dshash_create(last_start_times_dsa, &dsh_params, NULL);
                               1090                 :                : 
                               1091                 :                :         /* Store handles in shared memory for other backends to use. */
 1060 tgl@sss.pgh.pa.us        1092                 :             53 :         LogicalRepCtx->last_start_dsa = dsa_get_handle(last_start_times_dsa);
                               1093                 :             53 :         LogicalRepCtx->last_start_dsh = dshash_get_hash_table_handle(last_start_times);
                               1094                 :                :     }
                               1095         [ +  - ]:            192 :     else if (!last_start_times)
                               1096                 :                :     {
                               1097                 :                :         /* Attach to existing dynamic shared hash table. */
                               1098                 :            192 :         last_start_times_dsa = dsa_attach(LogicalRepCtx->last_start_dsa);
                               1099                 :            192 :         dsa_pin_mapping(last_start_times_dsa);
                               1100                 :            192 :         last_start_times = dshash_attach(last_start_times_dsa, &dsh_params,
  194 nathan@postgresql.or     1101                 :            192 :                                          LogicalRepCtx->last_start_dsh, NULL);
                               1102                 :                :     }
                               1103                 :                : 
 1060 tgl@sss.pgh.pa.us        1104                 :            245 :     MemoryContextSwitchTo(oldcontext);
                               1105                 :            245 :     LWLockRelease(LogicalRepWorkerLock);
                               1106                 :                : }
                               1107                 :                : 
                               1108                 :                : /*
                               1109                 :                :  * Set the last-start time for the subscription.
                               1110                 :                :  */
                               1111                 :                : static void
                               1112                 :            201 : ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time)
                               1113                 :                : {
                               1114                 :                :     LauncherLastStartTimesEntry *entry;
                               1115                 :                :     bool        found;
                               1116                 :                : 
                               1117                 :            201 :     logicalrep_launcher_attach_dshmem();
                               1118                 :                : 
                               1119                 :            201 :     entry = dshash_find_or_insert(last_start_times, &subid, &found);
                               1120                 :            201 :     entry->last_start_time = start_time;
                               1121                 :            201 :     dshash_release_lock(last_start_times, entry);
                               1122                 :            201 : }
                               1123                 :                : 
                               1124                 :                : /*
                               1125                 :                :  * Return the last-start time for the subscription, or 0 if there isn't one.
                               1126                 :                :  */
                               1127                 :                : static TimestampTz
                               1128                 :            288 : ApplyLauncherGetWorkerStartTime(Oid subid)
                               1129                 :                : {
                               1130                 :                :     LauncherLastStartTimesEntry *entry;
                               1131                 :                :     TimestampTz ret;
                               1132                 :                : 
                               1133                 :            288 :     logicalrep_launcher_attach_dshmem();
                               1134                 :                : 
                               1135                 :            288 :     entry = dshash_find(last_start_times, &subid, false);
                               1136         [ +  + ]:            288 :     if (entry == NULL)
                               1137                 :            124 :         return 0;
                               1138                 :                : 
                               1139                 :            164 :     ret = entry->last_start_time;
                               1140                 :            164 :     dshash_release_lock(last_start_times, entry);
                               1141                 :                : 
                               1142                 :            164 :     return ret;
                               1143                 :                : }
                               1144                 :                : 
                               1145                 :                : /*
                               1146                 :                :  * Remove the last-start-time entry for the subscription, if one exists.
                               1147                 :                :  *
                               1148                 :                :  * This has two use-cases: to remove the entry related to a subscription
                               1149                 :                :  * that's been deleted or disabled (just to avoid leaking shared memory),
                               1150                 :                :  * and to allow immediate restart of an apply worker that has exited
                               1151                 :                :  * due to subscription parameter changes.
                               1152                 :                :  */
                               1153                 :                : void
                               1154                 :            231 : ApplyLauncherForgetWorkerStartTime(Oid subid)
                               1155                 :                : {
                               1156                 :            231 :     logicalrep_launcher_attach_dshmem();
                               1157                 :                : 
                               1158                 :            231 :     (void) dshash_delete_key(last_start_times, &subid);
                               1159                 :            231 : }
                               1160                 :                : 
                               1161                 :                : /*
                               1162                 :                :  * Wakeup the launcher on commit if requested.
                               1163                 :                :  */
                               1164                 :                : void
 3152 peter_e@gmx.net          1165                 :         331830 : AtEOXact_ApplyLauncher(bool isCommit)
                               1166                 :                : {
 3057                          1167         [ +  + ]:         331830 :     if (isCommit)
                               1168                 :                :     {
                               1169         [ +  + ]:         305452 :         if (on_commit_launcher_wakeup)
                               1170                 :            143 :             ApplyLauncherWakeup();
                               1171                 :                :     }
                               1172                 :                : 
 3152                          1173                 :         331830 :     on_commit_launcher_wakeup = false;
 3254                          1174                 :         331830 : }
                               1175                 :                : 
                               1176                 :                : /*
                               1177                 :                :  * Request wakeup of the launcher on commit of the transaction.
                               1178                 :                :  *
                               1179                 :                :  * This is used to send launcher signal to stop sleeping and process the
                               1180                 :                :  * subscriptions when current transaction commits. Should be used when new
                               1181                 :                :  * tuple was added to the pg_subscription catalog.
                               1182                 :                : */
                               1183                 :                : void
                               1184                 :            144 : ApplyLauncherWakeupAtCommit(void)
                               1185                 :                : {
 3236 heikki.linnakangas@i     1186         [ +  + ]:            144 :     if (!on_commit_launcher_wakeup)
                               1187                 :            143 :         on_commit_launcher_wakeup = true;
 3254 peter_e@gmx.net          1188                 :            144 : }
                               1189                 :                : 
                               1190                 :                : /*
                               1191                 :                :  * Wakeup the launcher immediately.
                               1192                 :                :  */
                               1193                 :                : void
                               1194                 :            721 : ApplyLauncherWakeup(void)
                               1195                 :                : {
 3164 fujii@postgresql.org     1196         [ +  + ]:            721 :     if (LogicalRepCtx->launcher_pid != 0)
 3254 peter_e@gmx.net          1197                 :            697 :         kill(LogicalRepCtx->launcher_pid, SIGUSR1);
                               1198                 :            721 : }
                               1199                 :                : 
                               1200                 :                : /*
                               1201                 :                :  * Main loop for the apply launcher process.
                               1202                 :                :  */
                               1203                 :                : void
                               1204                 :            413 : ApplyLauncherMain(Datum main_arg)
                               1205                 :                : {
 3204 tgl@sss.pgh.pa.us        1206         [ +  + ]:            413 :     ereport(DEBUG1,
                               1207                 :                :             (errmsg_internal("logical replication launcher started")));
                               1208                 :                : 
 3164 fujii@postgresql.org     1209                 :            413 :     before_shmem_exit(logicalrep_launcher_onexit, (Datum) 0);
                               1210                 :                : 
 3114 andres@anarazel.de       1211         [ -  + ]:            413 :     Assert(LogicalRepCtx->launcher_pid == 0);
                               1212                 :            413 :     LogicalRepCtx->launcher_pid = MyProcPid;
                               1213                 :                : 
                               1214                 :                :     /* Establish signal handlers. */
 1979 akapila@postgresql.o     1215                 :            413 :     pqsignal(SIGHUP, SignalHandlerForConfigReload);
 3114 andres@anarazel.de       1216                 :            413 :     pqsignal(SIGTERM, die);
 3254 peter_e@gmx.net          1217                 :            413 :     BackgroundWorkerUnblockSignals();
                               1218                 :                : 
                               1219                 :                :     /*
                               1220                 :                :      * Establish connection to nailed catalogs (we only ever access
                               1221                 :                :      * pg_subscription).
                               1222                 :                :      */
 2813 magnus@hagander.net      1223                 :            413 :     BackgroundWorkerInitializeConnection(NULL, NULL, 0);
                               1224                 :                : 
                               1225                 :                :     /*
                               1226                 :                :      * Acquire the conflict detection slot at startup to ensure it can be
                               1227                 :                :      * dropped if no longer needed after a restart.
                               1228                 :                :      */
  147 akapila@postgresql.o     1229                 :GNC         413 :     acquire_conflict_slot_if_exists();
                               1230                 :                : 
                               1231                 :                :     /* Enter main loop */
                               1232                 :                :     for (;;)
 3254 peter_e@gmx.net          1233                 :CBC        2376 :     {
                               1234                 :                :         int         rc;
                               1235                 :                :         List       *sublist;
                               1236                 :                :         ListCell   *lc;
                               1237                 :                :         MemoryContext subctx;
                               1238                 :                :         MemoryContext oldctx;
 3136 bruce@momjian.us         1239                 :           2789 :         long        wait_time = DEFAULT_NAPTIME_PER_CYCLE;
  106 akapila@postgresql.o     1240                 :GNC        2789 :         bool        can_update_xmin = true;
  147                          1241                 :           2789 :         bool        retain_dead_tuples = false;
                               1242                 :           2789 :         TransactionId xmin = InvalidTransactionId;
                               1243                 :                : 
 3114 andres@anarazel.de       1244         [ -  + ]:CBC        2789 :         CHECK_FOR_INTERRUPTS();
                               1245                 :                : 
                               1246                 :                :         /* Use temporary context to avoid leaking memory across cycles. */
 1060 tgl@sss.pgh.pa.us        1247                 :           2789 :         subctx = AllocSetContextCreate(TopMemoryContext,
                               1248                 :                :                                        "Logical Replication Launcher sublist",
                               1249                 :                :                                        ALLOCSET_DEFAULT_SIZES);
                               1250                 :           2789 :         oldctx = MemoryContextSwitchTo(subctx);
                               1251                 :                : 
                               1252                 :                :         /*
                               1253                 :                :          * Start any missing workers for enabled subscriptions.
                               1254                 :                :          *
                               1255                 :                :          * Also, during the iteration through all subscriptions, we compute
                               1256                 :                :          * the minimum XID required to protect deleted tuples for conflict
                               1257                 :                :          * detection if one of the subscription enables retain_dead_tuples
                               1258                 :                :          * option.
                               1259                 :                :          */
                               1260                 :           2789 :         sublist = get_subscription_list();
                               1261   [ +  +  +  +  :           3614 :         foreach(lc, sublist)
                                              +  + ]
                               1262                 :                :         {
                               1263                 :            827 :             Subscription *sub = (Subscription *) lfirst(lc);
                               1264                 :                :             LogicalRepWorker *w;
                               1265                 :                :             TimestampTz last_start;
                               1266                 :                :             TimestampTz now;
                               1267                 :                :             long        elapsed;
                               1268                 :                : 
  147 akapila@postgresql.o     1269         [ +  + ]:GNC         827 :             if (sub->retaindeadtuples)
                               1270                 :                :             {
                               1271                 :              7 :                 retain_dead_tuples = true;
                               1272                 :                : 
                               1273                 :                :                 /*
                               1274                 :                :                  * Create a replication slot to retain information necessary
                               1275                 :                :                  * for conflict detection such as dead tuples, commit
                               1276                 :                :                  * timestamps, and origins.
                               1277                 :                :                  *
                               1278                 :                :                  * The slot is created before starting the apply worker to
                               1279                 :                :                  * prevent it from unnecessarily maintaining its
                               1280                 :                :                  * oldest_nonremovable_xid.
                               1281                 :                :                  *
                               1282                 :                :                  * The slot is created even for a disabled subscription to
                               1283                 :                :                  * ensure that conflict-related information is available when
                               1284                 :                :                  * applying remote changes that occurred before the
                               1285                 :                :                  * subscription was enabled.
                               1286                 :                :                  */
                               1287                 :              7 :                 CreateConflictDetectionSlot();
                               1288                 :                : 
  106                          1289         [ +  - ]:              7 :                 if (sub->retentionactive)
                               1290                 :                :                 {
                               1291                 :                :                     /*
                               1292                 :                :                      * Can't advance xmin of the slot unless all the
                               1293                 :                :                      * subscriptions actively retaining dead tuples are
                               1294                 :                :                      * enabled. This is required to ensure that we don't
                               1295                 :                :                      * advance the xmin of CONFLICT_DETECTION_SLOT if one of
                               1296                 :                :                      * the subscriptions is not enabled. Otherwise, we won't
                               1297                 :                :                      * be able to detect conflicts reliably for such a
                               1298                 :                :                      * subscription even though it has set the
                               1299                 :                :                      * retain_dead_tuples option.
                               1300                 :                :                      */
                               1301                 :              7 :                     can_update_xmin &= sub->enabled;
                               1302                 :                : 
                               1303                 :                :                     /*
                               1304                 :                :                      * Initialize the slot once the subscription activates
                               1305                 :                :                      * retention.
                               1306                 :                :                      */
                               1307         [ -  + ]:              7 :                     if (!TransactionIdIsValid(MyReplicationSlot->data.xmin))
  106 akapila@postgresql.o     1308                 :UNC           0 :                         init_conflict_slot_xmin();
                               1309                 :                :                 }
                               1310                 :                :             }
                               1311                 :                : 
 1060 tgl@sss.pgh.pa.us        1312         [ +  + ]:CBC         827 :             if (!sub->enabled)
                               1313                 :             44 :                 continue;
                               1314                 :                : 
                               1315                 :            783 :             LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
   50 akapila@postgresql.o     1316                 :GNC         783 :             w = logicalrep_worker_find(WORKERTYPE_APPLY, sub->oid, InvalidOid,
                               1317                 :                :                                        false);
                               1318                 :                : 
 1060 tgl@sss.pgh.pa.us        1319         [ +  + ]:CBC         783 :             if (w != NULL)
                               1320                 :                :             {
                               1321                 :                :                 /*
                               1322                 :                :                  * Compute the minimum xmin required to protect dead tuples
                               1323                 :                :                  * required for conflict detection among all running apply
                               1324                 :                :                  * workers. This computation is performed while holding
                               1325                 :                :                  * LogicalRepWorkerLock to prevent accessing invalid worker
                               1326                 :                :                  * data, in scenarios where a worker might exit and reset its
                               1327                 :                :                  * state concurrently.
                               1328                 :                :                  */
  106 akapila@postgresql.o     1329         [ +  + ]:GNC         495 :                 if (sub->retaindeadtuples &&
                               1330   [ +  -  +  - ]:              3 :                     sub->retentionactive &&
                               1331                 :                :                     can_update_xmin)
  147                          1332                 :              3 :                     compute_min_nonremovable_xid(w, &xmin);
                               1333                 :                : 
   93                          1334                 :            495 :                 LWLockRelease(LogicalRepWorkerLock);
                               1335                 :                : 
                               1336                 :                :                 /* worker is running already */
  147                          1337                 :            495 :                 continue;
                               1338                 :                :             }
                               1339                 :                : 
   93                          1340                 :            288 :             LWLockRelease(LogicalRepWorkerLock);
                               1341                 :                : 
                               1342                 :                :             /*
                               1343                 :                :              * Can't advance xmin of the slot unless all the workers
                               1344                 :                :              * corresponding to subscriptions actively retaining dead tuples
                               1345                 :                :              * are running, disabling the further computation of the minimum
                               1346                 :                :              * nonremovable xid.
                               1347                 :                :              */
  106                          1348   [ +  +  +  - ]:            288 :             if (sub->retaindeadtuples && sub->retentionactive)
                               1349                 :              1 :                 can_update_xmin = false;
                               1350                 :                : 
                               1351                 :                :             /*
                               1352                 :                :              * If the worker is eligible to start now, launch it.  Otherwise,
                               1353                 :                :              * adjust wait_time so that we'll wake up as soon as it can be
                               1354                 :                :              * started.
                               1355                 :                :              *
                               1356                 :                :              * Each subscription's apply worker can only be restarted once per
                               1357                 :                :              * wal_retrieve_retry_interval, so that errors do not cause us to
                               1358                 :                :              * repeatedly restart the worker as fast as possible.  In cases
                               1359                 :                :              * where a restart is expected (e.g., subscription parameter
                               1360                 :                :              * changes), another process should remove the last-start entry
                               1361                 :                :              * for the subscription so that the worker can be restarted
                               1362                 :                :              * without waiting for wal_retrieve_retry_interval to elapse.
                               1363                 :                :              */
 1060 tgl@sss.pgh.pa.us        1364                 :CBC         288 :             last_start = ApplyLauncherGetWorkerStartTime(sub->oid);
                               1365                 :            288 :             now = GetCurrentTimestamp();
                               1366         [ +  + ]:            288 :             if (last_start == 0 ||
                               1367         [ +  + ]:            164 :                 (elapsed = TimestampDifferenceMilliseconds(last_start, now)) >= wal_retrieve_retry_interval)
                               1368                 :                :             {
                               1369                 :            201 :                 ApplyLauncherSetWorkerStartTime(sub->oid, now);
  176                          1370         [ +  + ]:            201 :                 if (!logicalrep_worker_launch(WORKERTYPE_APPLY,
                               1371                 :            201 :                                               sub->dbid, sub->oid, sub->name,
                               1372                 :                :                                               sub->owner, InvalidOid,
                               1373                 :                :                                               DSM_HANDLE_INVALID,
  106 akapila@postgresql.o     1374         [ +  + ]:GNC         202 :                                               sub->retaindeadtuples &&
                               1375         [ +  - ]:              1 :                                               sub->retentionactive))
                               1376                 :                :                 {
                               1377                 :                :                     /*
                               1378                 :                :                      * We get here either if we failed to launch a worker
                               1379                 :                :                      * (perhaps for resource-exhaustion reasons) or if we
                               1380                 :                :                      * launched one but it immediately quit.  Either way, it
                               1381                 :                :                      * seems appropriate to try again after
                               1382                 :                :                      * wal_retrieve_retry_interval.
                               1383                 :                :                      */
  176 tgl@sss.pgh.pa.us        1384                 :CBC           2 :                     wait_time = Min(wait_time,
                               1385                 :                :                                     wal_retrieve_retry_interval);
                               1386                 :                :                 }
                               1387                 :                :             }
                               1388                 :                :             else
                               1389                 :                :             {
 1060                          1390                 :             87 :                 wait_time = Min(wait_time,
                               1391                 :                :                                 wal_retrieve_retry_interval - elapsed);
                               1392                 :                :             }
                               1393                 :                :         }
                               1394                 :                : 
                               1395                 :                :         /*
                               1396                 :                :          * Drop the CONFLICT_DETECTION_SLOT slot if there is no subscription
                               1397                 :                :          * that requires us to retain dead tuples. Otherwise, if required,
                               1398                 :                :          * advance the slot's xmin to protect dead tuples required for the
                               1399                 :                :          * conflict detection.
                               1400                 :                :          *
                               1401                 :                :          * Additionally, if all apply workers for subscriptions with
                               1402                 :                :          * retain_dead_tuples enabled have requested to stop retention, the
                               1403                 :                :          * slot's xmin will be set to InvalidTransactionId allowing the
                               1404                 :                :          * removal of dead tuples.
                               1405                 :                :          */
  147 akapila@postgresql.o     1406         [ +  + ]:GNC        2787 :         if (MyReplicationSlot)
                               1407                 :                :         {
                               1408         [ +  + ]:              8 :             if (!retain_dead_tuples)
                               1409                 :              1 :                 ReplicationSlotDropAcquired();
  106                          1410         [ +  + ]:              7 :             else if (can_update_xmin)
                               1411                 :              3 :                 update_conflict_slot_xmin(xmin);
                               1412                 :                :         }
                               1413                 :                : 
                               1414                 :                :         /* Switch back to original memory context. */
 1060 tgl@sss.pgh.pa.us        1415                 :CBC        2787 :         MemoryContextSwitchTo(oldctx);
                               1416                 :                :         /* Clean the temporary memory. */
                               1417                 :           2787 :         MemoryContextDelete(subctx);
                               1418                 :                : 
                               1419                 :                :         /* Wait for more work. */
 3116 andres@anarazel.de       1420                 :           2787 :         rc = WaitLatch(MyLatch,
                               1421                 :                :                        WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
                               1422                 :                :                        wait_time,
                               1423                 :                :                        WAIT_EVENT_LOGICAL_LAUNCHER_MAIN);
                               1424                 :                : 
                               1425         [ +  + ]:           2784 :         if (rc & WL_LATCH_SET)
                               1426                 :                :         {
                               1427                 :           2769 :             ResetLatch(MyLatch);
                               1428         [ +  + ]:           2769 :             CHECK_FOR_INTERRUPTS();
                               1429                 :                :         }
                               1430                 :                : 
 2192 rhaas@postgresql.org     1431         [ +  + ]:           2376 :         if (ConfigReloadPending)
                               1432                 :                :         {
                               1433                 :             42 :             ConfigReloadPending = false;
 3173 peter_e@gmx.net          1434                 :             42 :             ProcessConfigFile(PGC_SIGHUP);
                               1435                 :                :         }
                               1436                 :                :     }
                               1437                 :                : 
                               1438                 :                :     /* Not reachable */
                               1439                 :                : }
                               1440                 :                : 
                               1441                 :                : /*
                               1442                 :                :  * Determine the minimum non-removable transaction ID across all apply workers
                               1443                 :                :  * for subscriptions that have retain_dead_tuples enabled. Store the result
                               1444                 :                :  * in *xmin.
                               1445                 :                :  */
                               1446                 :                : static void
  147 akapila@postgresql.o     1447                 :GNC           3 : compute_min_nonremovable_xid(LogicalRepWorker *worker, TransactionId *xmin)
                               1448                 :                : {
                               1449                 :                :     TransactionId nonremovable_xid;
                               1450                 :                : 
                               1451         [ -  + ]:              3 :     Assert(worker != NULL);
                               1452                 :                : 
                               1453                 :                :     /*
                               1454                 :                :      * The replication slot for conflict detection must be created before the
                               1455                 :                :      * worker starts.
                               1456                 :                :      */
                               1457         [ -  + ]:              3 :     Assert(MyReplicationSlot);
                               1458                 :                : 
                               1459         [ -  + ]:              3 :     SpinLockAcquire(&worker->relmutex);
                               1460                 :              3 :     nonremovable_xid = worker->oldest_nonremovable_xid;
                               1461                 :              3 :     SpinLockRelease(&worker->relmutex);
                               1462                 :                : 
                               1463                 :                :     /*
                               1464                 :                :      * Return if the apply worker has stopped retention concurrently.
                               1465                 :                :      *
                               1466                 :                :      * Although this function is invoked only when retentionactive is true,
                               1467                 :                :      * the apply worker might stop retention after the launcher fetches the
                               1468                 :                :      * retentionactive flag.
                               1469                 :                :      */
  106                          1470         [ -  + ]:              3 :     if (!TransactionIdIsValid(nonremovable_xid))
  106 akapila@postgresql.o     1471                 :UNC           0 :         return;
                               1472                 :                : 
  147 akapila@postgresql.o     1473   [ -  +  -  - ]:GNC           3 :     if (!TransactionIdIsValid(*xmin) ||
  147 akapila@postgresql.o     1474                 :UNC           0 :         TransactionIdPrecedes(nonremovable_xid, *xmin))
  147 akapila@postgresql.o     1475                 :GNC           3 :         *xmin = nonremovable_xid;
                               1476                 :                : }
                               1477                 :                : 
                               1478                 :                : /*
                               1479                 :                :  * Acquire the replication slot used to retain information for conflict
                               1480                 :                :  * detection, if it exists.
                               1481                 :                :  *
                               1482                 :                :  * Return true if successfully acquired, otherwise return false.
                               1483                 :                :  */
                               1484                 :                : static bool
                               1485                 :            413 : acquire_conflict_slot_if_exists(void)
                               1486                 :                : {
                               1487         [ +  + ]:            413 :     if (!SearchNamedReplicationSlot(CONFLICT_DETECTION_SLOT, true))
                               1488                 :            412 :         return false;
                               1489                 :                : 
                               1490                 :              1 :     ReplicationSlotAcquire(CONFLICT_DETECTION_SLOT, true, false);
                               1491                 :              1 :     return true;
                               1492                 :                : }
                               1493                 :                : 
                               1494                 :                : /*
                               1495                 :                :  * Update the xmin the replication slot used to retain information required
                               1496                 :                :  * for conflict detection.
                               1497                 :                :  */
                               1498                 :                : static void
  106                          1499                 :              3 : update_conflict_slot_xmin(TransactionId new_xmin)
                               1500                 :                : {
  147                          1501         [ -  + ]:              3 :     Assert(MyReplicationSlot);
  106                          1502   [ +  -  -  + ]:              3 :     Assert(!TransactionIdIsValid(new_xmin) ||
                               1503                 :                :            TransactionIdPrecedesOrEquals(MyReplicationSlot->data.xmin, new_xmin));
                               1504                 :                : 
                               1505                 :                :     /* Return if the xmin value of the slot cannot be updated */
  147                          1506         [ +  + ]:              3 :     if (TransactionIdEquals(MyReplicationSlot->data.xmin, new_xmin))
                               1507                 :              2 :         return;
                               1508                 :                : 
                               1509         [ -  + ]:              1 :     SpinLockAcquire(&MyReplicationSlot->mutex);
                               1510                 :              1 :     MyReplicationSlot->effective_xmin = new_xmin;
                               1511                 :              1 :     MyReplicationSlot->data.xmin = new_xmin;
                               1512                 :              1 :     SpinLockRelease(&MyReplicationSlot->mutex);
                               1513                 :                : 
                               1514         [ -  + ]:              1 :     elog(DEBUG1, "updated xmin: %u", MyReplicationSlot->data.xmin);
                               1515                 :                : 
                               1516                 :              1 :     ReplicationSlotMarkDirty();
                               1517                 :              1 :     ReplicationSlotsComputeRequiredXmin(false);
                               1518                 :                : 
                               1519                 :                :     /*
                               1520                 :                :      * Like PhysicalConfirmReceivedLocation(), do not save slot information
                               1521                 :                :      * each time. This is acceptable because all concurrent transactions on
                               1522                 :                :      * the publisher that require the data preceding the slot's xmin should
                               1523                 :                :      * have already been applied and flushed on the subscriber before the xmin
                               1524                 :                :      * is advanced. So, even if the slot's xmin regresses after a restart, it
                               1525                 :                :      * will be advanced again in the next cycle. Therefore, no data required
                               1526                 :                :      * for conflict detection will be prematurely removed.
                               1527                 :                :      */
                               1528                 :              1 :     return;
                               1529                 :                : }
                               1530                 :                : 
                               1531                 :                : /*
                               1532                 :                :  * Initialize the xmin for the conflict detection slot.
                               1533                 :                :  */
                               1534                 :                : static void
  106                          1535                 :              3 : init_conflict_slot_xmin(void)
                               1536                 :                : {
                               1537                 :                :     TransactionId xmin_horizon;
                               1538                 :                : 
                               1539                 :                :     /* Replication slot must exist but shouldn't be initialized. */
                               1540   [ +  -  -  + ]:              3 :     Assert(MyReplicationSlot &&
                               1541                 :                :            !TransactionIdIsValid(MyReplicationSlot->data.xmin));
                               1542                 :                : 
  147                          1543                 :              3 :     LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
                               1544                 :                : 
                               1545                 :              3 :     xmin_horizon = GetOldestSafeDecodingTransactionId(false);
                               1546                 :                : 
                               1547         [ -  + ]:              3 :     SpinLockAcquire(&MyReplicationSlot->mutex);
                               1548                 :              3 :     MyReplicationSlot->effective_xmin = xmin_horizon;
                               1549                 :              3 :     MyReplicationSlot->data.xmin = xmin_horizon;
                               1550                 :              3 :     SpinLockRelease(&MyReplicationSlot->mutex);
                               1551                 :                : 
                               1552                 :              3 :     ReplicationSlotsComputeRequiredXmin(true);
                               1553                 :                : 
                               1554                 :              3 :     LWLockRelease(ProcArrayLock);
                               1555                 :                : 
                               1556                 :                :     /* Write this slot to disk */
                               1557                 :              3 :     ReplicationSlotMarkDirty();
                               1558                 :              3 :     ReplicationSlotSave();
                               1559                 :              3 : }
                               1560                 :                : 
                               1561                 :                : /*
                               1562                 :                :  * Create and acquire the replication slot used to retain information for
                               1563                 :                :  * conflict detection, if not yet.
                               1564                 :                :  */
                               1565                 :                : void
  106                          1566                 :              8 : CreateConflictDetectionSlot(void)
                               1567                 :                : {
                               1568                 :                :     /* Exit early, if the replication slot is already created and acquired */
                               1569         [ +  + ]:              8 :     if (MyReplicationSlot)
                               1570                 :              5 :         return;
                               1571                 :                : 
                               1572         [ +  - ]:              3 :     ereport(LOG,
                               1573                 :                :             errmsg("creating replication conflict detection slot"));
                               1574                 :                : 
                               1575                 :              3 :     ReplicationSlotCreate(CONFLICT_DETECTION_SLOT, false, RS_PERSISTENT, false,
                               1576                 :                :                           false, false);
                               1577                 :                : 
                               1578                 :              3 :     init_conflict_slot_xmin();
                               1579                 :                : }
                               1580                 :                : 
                               1581                 :                : /*
                               1582                 :                :  * Is current process the logical replication launcher?
                               1583                 :                :  */
                               1584                 :                : bool
 3114 andres@anarazel.de       1585                 :CBC        2358 : IsLogicalLauncher(void)
                               1586                 :                : {
                               1587                 :           2358 :     return LogicalRepCtx->launcher_pid == MyProcPid;
                               1588                 :                : }
                               1589                 :                : 
                               1590                 :                : /*
                               1591                 :                :  * Return the pid of the leader apply worker if the given pid is the pid of a
                               1592                 :                :  * parallel apply worker, otherwise, return InvalidPid.
                               1593                 :                :  */
                               1594                 :                : pid_t
 1064 akapila@postgresql.o     1595                 :            866 : GetLeaderApplyWorkerPid(pid_t pid)
                               1596                 :                : {
                               1597                 :            866 :     int         leader_pid = InvalidPid;
                               1598                 :                :     int         i;
                               1599                 :                : 
                               1600                 :            866 :     LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
                               1601                 :                : 
                               1602         [ +  + ]:           4330 :     for (i = 0; i < max_logical_replication_workers; i++)
                               1603                 :                :     {
                               1604                 :           3464 :         LogicalRepWorker *w = &LogicalRepCtx->workers[i];
                               1605                 :                : 
                               1606   [ +  +  -  +  :           3464 :         if (isParallelApplyWorker(w) && w->proc && pid == w->proc->pid)
                                        -  -  -  - ]
                               1607                 :                :         {
 1064 akapila@postgresql.o     1608                 :UBC           0 :             leader_pid = w->leader_pid;
                               1609                 :              0 :             break;
                               1610                 :                :         }
                               1611                 :                :     }
                               1612                 :                : 
 1064 akapila@postgresql.o     1613                 :CBC         866 :     LWLockRelease(LogicalRepWorkerLock);
                               1614                 :                : 
                               1615                 :            866 :     return leader_pid;
                               1616                 :                : }
                               1617                 :                : 
                               1618                 :                : /*
                               1619                 :                :  * Returns state of the subscriptions.
                               1620                 :                :  */
                               1621                 :                : Datum
 3254 peter_e@gmx.net          1622                 :              1 : pg_stat_get_subscription(PG_FUNCTION_ARGS)
                               1623                 :                : {
                               1624                 :                : #define PG_STAT_GET_SUBSCRIPTION_COLS   10
                               1625         [ -  + ]:              1 :     Oid         subid = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0);
                               1626                 :                :     int         i;
                               1627                 :              1 :     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
                               1628                 :                : 
 1156 michael@paquier.xyz      1629                 :              1 :     InitMaterializedSRF(fcinfo, 0);
                               1630                 :                : 
                               1631                 :                :     /* Make sure we get consistent view of the workers. */
 3254 peter_e@gmx.net          1632                 :              1 :     LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
                               1633                 :                : 
 1289 tgl@sss.pgh.pa.us        1634         [ +  + ]:              5 :     for (i = 0; i < max_logical_replication_workers; i++)
                               1635                 :                :     {
                               1636                 :                :         /* for each row */
 1250 peter@eisentraut.org     1637                 :              4 :         Datum       values[PG_STAT_GET_SUBSCRIPTION_COLS] = {0};
                               1638                 :              4 :         bool        nulls[PG_STAT_GET_SUBSCRIPTION_COLS] = {0};
                               1639                 :                :         int         worker_pid;
                               1640                 :                :         LogicalRepWorker worker;
                               1641                 :                : 
 3254 peter_e@gmx.net          1642                 :              4 :         memcpy(&worker, &LogicalRepCtx->workers[i],
                               1643                 :                :                sizeof(LogicalRepWorker));
                               1644   [ +  +  -  + ]:              4 :         if (!worker.proc || !IsBackendPid(worker.proc->pid))
                               1645                 :              2 :             continue;
                               1646                 :                : 
                               1647   [ -  +  -  - ]:              2 :         if (OidIsValid(subid) && worker.subid != subid)
 3254 peter_e@gmx.net          1648                 :UBC           0 :             continue;
                               1649                 :                : 
 3254 peter_e@gmx.net          1650                 :CBC           2 :         worker_pid = worker.proc->pid;
                               1651                 :                : 
                               1652                 :              2 :         values[0] = ObjectIdGetDatum(worker.subid);
   42 akapila@postgresql.o     1653   [ +  -  -  + ]:GNC           2 :         if (isTableSyncWorker(&worker))
 3191 peter_e@gmx.net          1654                 :UBC           0 :             values[1] = ObjectIdGetDatum(worker.relid);
                               1655                 :                :         else
 3191 peter_e@gmx.net          1656                 :CBC           2 :             nulls[1] = true;
                               1657                 :              2 :         values[2] = Int32GetDatum(worker_pid);
                               1658                 :                : 
 1064 akapila@postgresql.o     1659   [ +  -  -  + ]:              2 :         if (isParallelApplyWorker(&worker))
 1064 akapila@postgresql.o     1660                 :UBC           0 :             values[3] = Int32GetDatum(worker.leader_pid);
                               1661                 :                :         else
 3191 peter_e@gmx.net          1662                 :CBC           2 :             nulls[3] = true;
                               1663                 :                : 
   41 alvherre@kurilemu.de     1664         [ -  + ]:GNC           2 :         if (!XLogRecPtrIsValid(worker.last_lsn))
 1064 akapila@postgresql.o     1665                 :UBC           0 :             nulls[4] = true;
                               1666                 :                :         else
 1064 akapila@postgresql.o     1667                 :CBC           2 :             values[4] = LSNGetDatum(worker.last_lsn);
 3254 peter_e@gmx.net          1668         [ -  + ]:              2 :         if (worker.last_send_time == 0)
 1064 akapila@postgresql.o     1669                 :UBC           0 :             nulls[5] = true;
                               1670                 :                :         else
 1064 akapila@postgresql.o     1671                 :CBC           2 :             values[5] = TimestampTzGetDatum(worker.last_send_time);
 3254 peter_e@gmx.net          1672         [ -  + ]:              2 :         if (worker.last_recv_time == 0)
 1064 akapila@postgresql.o     1673                 :UBC           0 :             nulls[6] = true;
                               1674                 :                :         else
 1064 akapila@postgresql.o     1675                 :CBC           2 :             values[6] = TimestampTzGetDatum(worker.last_recv_time);
   41 alvherre@kurilemu.de     1676         [ -  + ]:GNC           2 :         if (!XLogRecPtrIsValid(worker.reply_lsn))
 1064 akapila@postgresql.o     1677                 :UBC           0 :             nulls[7] = true;
                               1678                 :                :         else
 1064 akapila@postgresql.o     1679                 :CBC           2 :             values[7] = LSNGetDatum(worker.reply_lsn);
 3254 peter_e@gmx.net          1680         [ -  + ]:              2 :         if (worker.reply_time == 0)
 1064 akapila@postgresql.o     1681                 :UBC           0 :             nulls[8] = true;
                               1682                 :                :         else
 1064 akapila@postgresql.o     1683                 :CBC           2 :             values[8] = TimestampTzGetDatum(worker.reply_time);
                               1684                 :                : 
  814 nathan@postgresql.or     1685   [ +  -  -  -  :              2 :         switch (worker.type)
                                              -  - ]
                               1686                 :                :         {
                               1687                 :              2 :             case WORKERTYPE_APPLY:
                               1688                 :              2 :                 values[9] = CStringGetTextDatum("apply");
                               1689                 :              2 :                 break;
  814 nathan@postgresql.or     1690                 :UBC           0 :             case WORKERTYPE_PARALLEL_APPLY:
                               1691                 :              0 :                 values[9] = CStringGetTextDatum("parallel apply");
                               1692                 :              0 :                 break;
   42 akapila@postgresql.o     1693                 :UNC           0 :             case WORKERTYPE_SEQUENCESYNC:
                               1694                 :              0 :                 values[9] = CStringGetTextDatum("sequence synchronization");
                               1695                 :              0 :                 break;
  814 nathan@postgresql.or     1696                 :UBC           0 :             case WORKERTYPE_TABLESYNC:
                               1697                 :              0 :                 values[9] = CStringGetTextDatum("table synchronization");
                               1698                 :              0 :                 break;
                               1699                 :              0 :             case WORKERTYPE_UNKNOWN:
                               1700                 :                :                 /* Should never happen. */
                               1701         [ #  # ]:              0 :                 elog(ERROR, "unknown worker type");
                               1702                 :                :         }
                               1703                 :                : 
 1381 michael@paquier.xyz      1704                 :CBC           2 :         tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
                               1705                 :                :                              values, nulls);
                               1706                 :                : 
                               1707                 :                :         /*
                               1708                 :                :          * If only a single subscription was requested, and we found it,
                               1709                 :                :          * break.
                               1710                 :                :          */
 3254 peter_e@gmx.net          1711         [ -  + ]:              2 :         if (OidIsValid(subid))
 3254 peter_e@gmx.net          1712                 :UBC           0 :             break;
                               1713                 :                :     }
                               1714                 :                : 
 3254 peter_e@gmx.net          1715                 :CBC           1 :     LWLockRelease(LogicalRepWorkerLock);
                               1716                 :                : 
                               1717                 :              1 :     return (Datum) 0;
                               1718                 :                : }
        

Generated by: LCOV version 2.4-beta