LCOV - differential code coverage report
Current view: top level - src/backend/replication/logical - launcher.c (source / functions) Coverage Total Hit UNC UIC UBC GBC GIC GNC CBC EUB ECB DUB DCB
Current: bed3ffbf9d952be6c7d739d068cdce44c046dfb7 vs 574581b50ac9c63dd9e4abebb731a3b67e5b50f6 Lines: 89.1 % 587 523 6 58 1 1 122 399 30
Current Date: 2026-05-05 10:23:31 +0900 Functions: 100.0 % 37 37 17 20 5
Baseline: lcov-20260505-025707-baseline Branches: 66.1 % 392 259 33 2 98 4 69 186 1 1 7 19
Baseline Date: 2026-05-05 10:27:06 +0900 Line coverage date bins:
Legend: Lines:     hit not hit
Branches: + taken - not taken # not executed
(7,30] days: 100.0 % 9 9 9
(30,360] days: 94.7 % 133 126 6 1 1 113 12
(360..) days: 87.2 % 445 388 57 1 387
Function coverage date bins:
(7,30] days: 100.0 % 2 2 2
(30,360] days: 100.0 % 9 9 9
(360..) days: 100.0 % 26 26 6 20
Branch coverage date bins:
(7,30] days: 100.0 % 2 2 2
(30,360] days: 67.3 % 104 70 33 1 1 67 2
(360..) days: 64.9 % 288 187 2 97 3 184 1 1

 Age         Owner                    Branch data    TLA  Line data    Source code
                                  1                 :                : /*-------------------------------------------------------------------------
                                  2                 :                :  * launcher.c
                                  3                 :                :  *     PostgreSQL logical replication worker launcher process
                                  4                 :                :  *
                                  5                 :                :  * Copyright (c) 2016-2026, PostgreSQL Global Development Group
                                  6                 :                :  *
                                  7                 :                :  * IDENTIFICATION
                                  8                 :                :  *    src/backend/replication/logical/launcher.c
                                  9                 :                :  *
                                 10                 :                :  * NOTES
                                 11                 :                :  *    This module contains the logical replication worker launcher which
                                 12                 :                :  *    uses the background worker infrastructure to start the logical
                                 13                 :                :  *    replication workers for every enabled subscription.
                                 14                 :                :  *
                                 15                 :                :  *-------------------------------------------------------------------------
                                 16                 :                :  */
                                 17                 :                : 
                                 18                 :                : #include "postgres.h"
                                 19                 :                : 
                                 20                 :                : #include "access/heapam.h"
                                 21                 :                : #include "access/htup.h"
                                 22                 :                : #include "access/htup_details.h"
                                 23                 :                : #include "access/tableam.h"
                                 24                 :                : #include "access/xact.h"
                                 25                 :                : #include "catalog/pg_subscription.h"
                                 26                 :                : #include "catalog/pg_subscription_rel.h"
                                 27                 :                : #include "funcapi.h"
                                 28                 :                : #include "lib/dshash.h"
                                 29                 :                : #include "miscadmin.h"
                                 30                 :                : #include "pgstat.h"
                                 31                 :                : #include "postmaster/bgworker.h"
                                 32                 :                : #include "postmaster/interrupt.h"
                                 33                 :                : #include "replication/logicallauncher.h"
                                 34                 :                : #include "replication/origin.h"
                                 35                 :                : #include "replication/slot.h"
                                 36                 :                : #include "replication/walreceiver.h"
                                 37                 :                : #include "replication/worker_internal.h"
                                 38                 :                : #include "storage/ipc.h"
                                 39                 :                : #include "storage/proc.h"
                                 40                 :                : #include "storage/procarray.h"
                                 41                 :                : #include "storage/subsystems.h"
                                 42                 :                : #include "tcop/tcopprot.h"
                                 43                 :                : #include "utils/builtins.h"
                                 44                 :                : #include "utils/memutils.h"
                                 45                 :                : #include "utils/pg_lsn.h"
                                 46                 :                : #include "utils/snapmgr.h"
                                 47                 :                : #include "utils/syscache.h"
                                 48                 :                : #include "utils/wait_event.h"
                                 49                 :                : 
                                 50                 :                : /* max sleep time between cycles (3min) */
                                 51                 :                : #define DEFAULT_NAPTIME_PER_CYCLE 180000L
                                 52                 :                : 
                                 53                 :                : /* GUC variables */
                                 54                 :                : int         max_logical_replication_workers = 4;
                                 55                 :                : int         max_sync_workers_per_subscription = 2;
                                 56                 :                : int         max_parallel_apply_workers_per_subscription = 2;
                                 57                 :                : 
                                 58                 :                : LogicalRepWorker *MyLogicalRepWorker = NULL;
                                 59                 :                : 
                                 60                 :                : typedef struct LogicalRepCtxStruct
                                 61                 :                : {
                                 62                 :                :     /* Supervisor process. */
                                 63                 :                :     pid_t       launcher_pid;
                                 64                 :                : 
                                 65                 :                :     /* Hash table holding last start times of subscriptions' apply workers. */
                                 66                 :                :     dsa_handle  last_start_dsa;
                                 67                 :                :     dshash_table_handle last_start_dsh;
                                 68                 :                : 
                                 69                 :                :     /* Background workers. */
                                 70                 :                :     LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER];
                                 71                 :                : } LogicalRepCtxStruct;
                                 72                 :                : 
                                 73                 :                : static LogicalRepCtxStruct *LogicalRepCtx;
                                 74                 :                : 
                                 75                 :                : static void ApplyLauncherShmemRequest(void *arg);
                                 76                 :                : static void ApplyLauncherShmemInit(void *arg);
                                 77                 :                : 
                                 78                 :                : const ShmemCallbacks ApplyLauncherShmemCallbacks = {
                                 79                 :                :     .request_fn = ApplyLauncherShmemRequest,
                                 80                 :                :     .init_fn = ApplyLauncherShmemInit,
                                 81                 :                : };
                                 82                 :                : 
                                 83                 :                : /* an entry in the last-start-times shared hash table */
                                 84                 :                : typedef struct LauncherLastStartTimesEntry
                                 85                 :                : {
                                 86                 :                :     Oid         subid;          /* OID of logrep subscription (hash key) */
                                 87                 :                :     TimestampTz last_start_time;    /* last time its apply worker was started */
                                 88                 :                : } LauncherLastStartTimesEntry;
                                 89                 :                : 
                                 90                 :                : /* parameters for the last-start-times shared hash table */
                                 91                 :                : static const dshash_parameters dsh_params = {
                                 92                 :                :     sizeof(Oid),
                                 93                 :                :     sizeof(LauncherLastStartTimesEntry),
                                 94                 :                :     dshash_memcmp,
                                 95                 :                :     dshash_memhash,
                                 96                 :                :     dshash_memcpy,
                                 97                 :                :     LWTRANCHE_LAUNCHER_HASH
                                 98                 :                : };
                                 99                 :                : 
                                100                 :                : static dsa_area *last_start_times_dsa = NULL;
                                101                 :                : static dshash_table *last_start_times = NULL;
                                102                 :                : 
                                103                 :                : static bool on_commit_launcher_wakeup = false;
                                104                 :                : 
                                105                 :                : 
                                106                 :                : static void logicalrep_launcher_onexit(int code, Datum arg);
                                107                 :                : static void logicalrep_worker_onexit(int code, Datum arg);
                                108                 :                : static void logicalrep_worker_detach(void);
                                109                 :                : static void logicalrep_worker_cleanup(LogicalRepWorker *worker);
                                110                 :                : static int  logicalrep_pa_worker_count(Oid subid);
                                111                 :                : static void logicalrep_launcher_attach_dshmem(void);
                                112                 :                : static void ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time);
                                113                 :                : static TimestampTz ApplyLauncherGetWorkerStartTime(Oid subid);
                                114                 :                : static void compute_min_nonremovable_xid(LogicalRepWorker *worker, TransactionId *xmin);
                                115                 :                : static bool acquire_conflict_slot_if_exists(void);
                                116                 :                : static void update_conflict_slot_xmin(TransactionId new_xmin);
                                117                 :                : static void init_conflict_slot_xmin(void);
                                118                 :                : 
                                119                 :                : 
                                120                 :                : /*
                                121                 :                :  * Load the list of subscriptions.
                                122                 :                :  *
                                123                 :                :  * Only the fields interesting for worker start/stop functions are filled for
                                124                 :                :  * each subscription.
                                125                 :                :  */
                                126                 :                : static List *
 3393 peter_e@gmx.net           127                 :CBC        3212 : get_subscription_list(void)
                                128                 :                : {
                                129                 :           3212 :     List       *res = NIL;
                                130                 :                :     Relation    rel;
                                131                 :                :     TableScanDesc scan;
                                132                 :                :     HeapTuple   tup;
                                133                 :                :     MemoryContext resultcxt;
                                134                 :                : 
                                135                 :                :     /* This is the context that we will allocate our output data in */
                                136                 :           3212 :     resultcxt = CurrentMemoryContext;
                                137                 :                : 
                                138                 :                :     /*
                                139                 :                :      * Start a transaction so we can access pg_subscription.
                                140                 :                :      */
                                141                 :           3212 :     StartTransactionCommand();
                                142                 :                : 
 2661 andres@anarazel.de        143                 :           3212 :     rel = table_open(SubscriptionRelationId, AccessShareLock);
 2612                           144                 :           3212 :     scan = table_beginscan_catalog(rel, 0, NULL);
                                145                 :                : 
 3393 peter_e@gmx.net           146         [ +  + ]:           4182 :     while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
                                147                 :                :     {
                                148                 :            970 :         Form_pg_subscription subform = (Form_pg_subscription) GETSTRUCT(tup);
                                149                 :                :         Subscription *sub;
                                150                 :                :         MemoryContext oldcxt;
                                151                 :                : 
                                152                 :                :         /*
                                153                 :                :          * Allocate our results in the caller's context, not the
                                154                 :                :          * transaction's. We do this inside the loop, and restore the original
                                155                 :                :          * context at the end, so that leaky things like heap_getnext() are
                                156                 :                :          * not called in a potentially long-lived context.
                                157                 :                :          */
                                158                 :            970 :         oldcxt = MemoryContextSwitchTo(resultcxt);
                                159                 :                : 
  146 michael@paquier.xyz       160                 :GNC         970 :         sub = palloc0_object(Subscription);
 2723 andres@anarazel.de        161                 :CBC         970 :         sub->oid = subform->oid;
 3393 peter_e@gmx.net           162                 :            970 :         sub->dbid = subform->subdbid;
                                163                 :            970 :         sub->owner = subform->subowner;
                                164                 :            970 :         sub->enabled = subform->subenabled;
                                165                 :            970 :         sub->name = pstrdup(NameStr(subform->subname));
  286 akapila@postgresql.o      166                 :GNC         970 :         sub->retaindeadtuples = subform->subretaindeadtuples;
  245                           167                 :            970 :         sub->retentionactive = subform->subretentionactive;
                                168                 :                :         /* We don't fill fields we are not interested in. */
                                169                 :                : 
 3393 peter_e@gmx.net           170                 :CBC         970 :         res = lappend(res, sub);
                                171                 :            970 :         MemoryContextSwitchTo(oldcxt);
                                172                 :                :     }
                                173                 :                : 
 2612 andres@anarazel.de        174                 :           3212 :     table_endscan(scan);
 2661                           175                 :           3212 :     table_close(rel, AccessShareLock);
                                176                 :                : 
 3393 peter_e@gmx.net           177                 :           3212 :     CommitTransactionCommand();
                                178                 :                : 
                                179                 :           3212 :     return res;
                                180                 :                : }
                                181                 :                : 
                                182                 :                : /*
                                183                 :                :  * Wait for a background worker to start up and attach to the shmem context.
                                184                 :                :  *
                                185                 :                :  * This is only needed for cleaning up the shared memory in case the worker
                                186                 :                :  * fails to attach.
                                187                 :                :  *
                                188                 :                :  * Returns whether the attach was successful.
                                189                 :                :  */
                                190                 :                : static bool
                                191                 :            457 : WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
                                192                 :                :                                uint16 generation,
                                193                 :                :                                BackgroundWorkerHandle *handle)
                                194                 :                : {
  315 tgl@sss.pgh.pa.us         195                 :            457 :     bool        result = false;
                                196                 :            457 :     bool        dropped_latch = false;
                                197                 :                : 
                                198                 :                :     for (;;)
 3393 peter_e@gmx.net           199                 :           1290 :     {
                                200                 :                :         BgwHandleStatus status;
                                201                 :                :         pid_t       pid;
                                202                 :                :         int         rc;
                                203                 :                : 
                                204         [ -  + ]:           1747 :         CHECK_FOR_INTERRUPTS();
                                205                 :                : 
 3296                           206                 :           1747 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
                                207                 :                : 
                                208                 :                :         /* Worker either died or has started. Return false if died. */
                                209   [ +  +  +  + ]:           1747 :         if (!worker->in_use || worker->proc)
                                210                 :                :         {
  315 tgl@sss.pgh.pa.us         211                 :            453 :             result = worker->in_use;
 3296 peter_e@gmx.net           212                 :            453 :             LWLockRelease(LogicalRepWorkerLock);
  315 tgl@sss.pgh.pa.us         213                 :            453 :             break;
                                214                 :                :         }
                                215                 :                : 
 3296 peter_e@gmx.net           216                 :           1294 :         LWLockRelease(LogicalRepWorkerLock);
                                217                 :                : 
                                218                 :                :         /* Check if worker has died before attaching, and clean up after it. */
 3393                           219                 :           1294 :         status = GetBackgroundWorkerPid(handle, &pid);
                                220                 :                : 
                                221         [ -  + ]:           1294 :         if (status == BGWH_STOPPED)
                                222                 :                :         {
 3296 peter_e@gmx.net           223                 :UBC           0 :             LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
                                224                 :                :             /* Ensure that this was indeed the worker we waited for. */
                                225         [ #  # ]:              0 :             if (generation == worker->generation)
                                226                 :              0 :                 logicalrep_worker_cleanup(worker);
                                227                 :              0 :             LWLockRelease(LogicalRepWorkerLock);
  315 tgl@sss.pgh.pa.us         228                 :              0 :             break;              /* result is already false */
                                229                 :                :         }
                                230                 :                : 
                                231                 :                :         /*
                                232                 :                :          * We need timeout because we generally don't get notified via latch
                                233                 :                :          * about the worker attach.  But we don't expect to have to wait long.
                                234                 :                :          */
 3393 peter_e@gmx.net           235                 :CBC        1294 :         rc = WaitLatch(MyLatch,
                                236                 :                :                        WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
                                237                 :                :                        10L, WAIT_EVENT_BGWORKER_STARTUP);
                                238                 :                : 
 3255 andres@anarazel.de        239         [ +  + ]:           1294 :         if (rc & WL_LATCH_SET)
                                240                 :                :         {
                                241                 :            537 :             ResetLatch(MyLatch);
                                242         [ +  + ]:            537 :             CHECK_FOR_INTERRUPTS();
  315 tgl@sss.pgh.pa.us         243                 :            533 :             dropped_latch = true;
                                244                 :                :         }
                                245                 :                :     }
                                246                 :                : 
                                247                 :                :     /*
                                248                 :                :      * If we had to clear a latch event in order to wait, be sure to restore
                                249                 :                :      * it before exiting.  Otherwise caller may miss events.
                                250                 :                :      */
                                251         [ +  - ]:            453 :     if (dropped_latch)
                                252                 :            453 :         SetLatch(MyLatch);
                                253                 :                : 
                                254                 :            453 :     return result;
                                255                 :                : }
                                256                 :                : 
                                257                 :                : /*
                                258                 :                :  * Walks the workers array and searches for one that matches given worker type,
                                259                 :                :  * subscription id, and relation id.
                                260                 :                :  *
                                261                 :                :  * For both apply workers and sequencesync workers, the relid should be set to
                                262                 :                :  * InvalidOid, as these workers handle changes across all tables and sequences
                                263                 :                :  * respectively, rather than targeting a specific relation. For tablesync
                                264                 :                :  * workers, the relid should be set to the OID of the relation being
                                265                 :                :  * synchronized.
                                266                 :                :  */
                                267                 :                : LogicalRepWorker *
  189 akapila@postgresql.o      268                 :GNC        3425 : logicalrep_worker_find(LogicalRepWorkerType wtype, Oid subid, Oid relid,
                                269                 :                :                        bool only_running)
                                270                 :                : {
                                271                 :                :     int         i;
 3275 bruce@momjian.us          272                 :CBC        3425 :     LogicalRepWorker *res = NULL;
                                273                 :                : 
                                274                 :                :     /* relid must be valid only for table sync workers */
  189 akapila@postgresql.o      275         [ -  + ]:GNC        3425 :     Assert((wtype == WORKERTYPE_TABLESYNC) == OidIsValid(relid));
 3393 peter_e@gmx.net           276         [ -  + ]:CBC        3425 :     Assert(LWLockHeldByMe(LogicalRepWorkerLock));
                                277                 :                : 
                                278                 :                :     /* Search for an attached worker that matches the specified criteria. */
                                279         [ +  + ]:          10436 :     for (i = 0; i < max_logical_replication_workers; i++)
                                280                 :                :     {
 3275 bruce@momjian.us          281                 :           9135 :         LogicalRepWorker *w = &LogicalRepCtx->workers[i];
                                282                 :                : 
                                283                 :                :         /* Skip parallel apply workers. */
 1212 akapila@postgresql.o      284   [ +  +  -  + ]:           9135 :         if (isParallelApplyWorker(w))
 1212 akapila@postgresql.o      285                 :UBC           0 :             continue;
                                286                 :                : 
 3296 peter_e@gmx.net           287   [ +  +  +  +  :CBC        9135 :         if (w->in_use && w->subid == subid && w->relid == relid &&
                                              +  + ]
  189 akapila@postgresql.o      288   [ +  +  +  +  :GNC        2161 :             w->type == wtype && (!only_running || w->proc))
                                              +  - ]
                                289                 :                :         {
 3393 peter_e@gmx.net           290                 :CBC        2124 :             res = w;
                                291                 :           2124 :             break;
                                292                 :                :         }
                                293                 :                :     }
                                294                 :                : 
                                295                 :           3425 :     return res;
                                296                 :                : }
                                297                 :                : 
                                298                 :                : /*
                                299                 :                :  * Similar to logicalrep_worker_find(), but returns a list of all workers for
                                300                 :                :  * the subscription, instead of just one.
                                301                 :                :  */
                                302                 :                : List *
  650 akapila@postgresql.o      303                 :            777 : logicalrep_workers_find(Oid subid, bool only_running, bool acquire_lock)
                                304                 :                : {
                                305                 :                :     int         i;
 3196 peter_e@gmx.net           306                 :            777 :     List       *res = NIL;
                                307                 :                : 
  650 akapila@postgresql.o      308         [ +  + ]:            777 :     if (acquire_lock)
                                309                 :            147 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
                                310                 :                : 
 3196 peter_e@gmx.net           311         [ -  + ]:            777 :     Assert(LWLockHeldByMe(LogicalRepWorkerLock));
                                312                 :                : 
                                313                 :                :     /* Search for attached worker for a given subscription id. */
                                314         [ +  + ]:           4011 :     for (i = 0; i < max_logical_replication_workers; i++)
                                315                 :                :     {
                                316                 :           3234 :         LogicalRepWorker *w = &LogicalRepCtx->workers[i];
                                317                 :                : 
                                318   [ +  +  +  +  :           3234 :         if (w->in_use && w->subid == subid && (!only_running || w->proc))
                                        +  +  +  + ]
                                319                 :            520 :             res = lappend(res, w);
                                320                 :                :     }
                                321                 :                : 
  650 akapila@postgresql.o      322         [ +  + ]:            777 :     if (acquire_lock)
                                323                 :            147 :         LWLockRelease(LogicalRepWorkerLock);
                                324                 :                : 
 3196 peter_e@gmx.net           325                 :            777 :     return res;
                                326                 :                : }
                                327                 :                : 
                                328                 :                : /*
                                329                 :                :  * Start new logical replication background worker, if possible.
                                330                 :                :  *
                                331                 :                :  * Returns true on success, false on failure.
                                332                 :                :  */
                                333                 :                : bool
  995 akapila@postgresql.o      334                 :            457 : logicalrep_worker_launch(LogicalRepWorkerType wtype,
                                335                 :                :                          Oid dbid, Oid subid, const char *subname, Oid userid,
                                336                 :                :                          Oid relid, dsm_handle subworker_dsm,
                                337                 :                :                          bool retain_dead_tuples)
                                338                 :                : {
                                339                 :                :     BackgroundWorker bgw;
                                340                 :                :     BackgroundWorkerHandle *bgw_handle;
                                341                 :                :     uint16      generation;
                                342                 :                :     int         i;
 3275 bruce@momjian.us          343                 :            457 :     int         slot = 0;
                                344                 :            457 :     LogicalRepWorker *worker = NULL;
                                345                 :                :     int         nsyncworkers;
                                346                 :                :     int         nparallelapplyworkers;
                                347                 :                :     TimestampTz now;
  995 akapila@postgresql.o      348                 :            457 :     bool        is_tablesync_worker = (wtype == WORKERTYPE_TABLESYNC);
  181 akapila@postgresql.o      349                 :GNC         457 :     bool        is_sequencesync_worker = (wtype == WORKERTYPE_SEQUENCESYNC);
  995 akapila@postgresql.o      350                 :CBC         457 :     bool        is_parallel_apply_worker = (wtype == WORKERTYPE_PARALLEL_APPLY);
                                351                 :                : 
                                352                 :                :     /*----------
                                353                 :                :      * Sanity checks:
                                354                 :                :      * - must be valid worker type
                                355                 :                :      * - tablesync workers are only ones to have relid
                                356                 :                :      * - parallel apply worker is the only kind of subworker
                                357                 :                :      * - The replication slot used in conflict detection is created when
                                358                 :                :      *   retain_dead_tuples is enabled
                                359                 :                :      */
                                360         [ -  + ]:            457 :     Assert(wtype != WORKERTYPE_UNKNOWN);
                                361         [ -  + ]:            457 :     Assert(is_tablesync_worker == OidIsValid(relid));
                                362         [ -  + ]:            457 :     Assert(is_parallel_apply_worker == (subworker_dsm != DSM_HANDLE_INVALID));
  286 akapila@postgresql.o      363   [ +  +  -  + ]:GNC         457 :     Assert(!retain_dead_tuples || MyReplicationSlot);
                                364                 :                : 
 3268 peter_e@gmx.net           365         [ +  + ]:CBC         457 :     ereport(DEBUG1,
                                366                 :                :             (errmsg_internal("starting logical replication worker for subscription \"%s\"",
                                367                 :                :                              subname)));
                                368                 :                : 
                                369                 :                :     /* Report this after the initial starting message for consistency. */
  410 msawada@postgresql.o      370         [ -  + ]:            457 :     if (max_active_replication_origins == 0)
 3393 peter_e@gmx.net           371         [ #  # ]:UBC           0 :         ereport(ERROR,
                                372                 :                :                 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
                                373                 :                :                  errmsg("cannot start logical replication workers when \"max_active_replication_origins\" is 0")));
                                374                 :                : 
                                375                 :                :     /*
                                376                 :                :      * We need to do the modification of the shared memory under lock so that
                                377                 :                :      * we have consistent view.
                                378                 :                :      */
 3393 peter_e@gmx.net           379                 :CBC         457 :     LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
                                380                 :                : 
 3296                           381                 :            457 : retry:
                                382                 :                :     /* Find unused worker slot. */
                                383         [ +  - ]:            793 :     for (i = 0; i < max_logical_replication_workers; i++)
                                384                 :                :     {
                                385                 :            793 :         LogicalRepWorker *w = &LogicalRepCtx->workers[i];
                                386                 :                : 
                                387         [ +  + ]:            793 :         if (!w->in_use)
                                388                 :                :         {
                                389                 :            457 :             worker = w;
                                390                 :            457 :             slot = i;
 3393                           391                 :            457 :             break;
                                392                 :                :         }
                                393                 :                :     }
                                394                 :                : 
 3296                           395                 :            457 :     nsyncworkers = logicalrep_sync_worker_count(subid);
                                396                 :                : 
                                397                 :            457 :     now = GetCurrentTimestamp();
                                398                 :                : 
                                399                 :                :     /*
                                400                 :                :      * If we didn't find a free slot, try to do garbage collection.  The
                                401                 :                :      * reason we do this is because if some worker failed to start up and its
                                402                 :                :      * parent has crashed while waiting, the in_use state was never cleared.
                                403                 :                :      */
                                404   [ +  -  -  + ]:            457 :     if (worker == NULL || nsyncworkers >= max_sync_workers_per_subscription)
                                405                 :                :     {
 3275 bruce@momjian.us          406                 :UBC           0 :         bool        did_cleanup = false;
                                407                 :                : 
 3296 peter_e@gmx.net           408         [ #  # ]:              0 :         for (i = 0; i < max_logical_replication_workers; i++)
                                409                 :                :         {
                                410                 :              0 :             LogicalRepWorker *w = &LogicalRepCtx->workers[i];
                                411                 :                : 
                                412                 :                :             /*
                                413                 :                :              * If the worker was marked in use but didn't manage to attach in
                                414                 :                :              * time, clean it up.
                                415                 :                :              */
                                416   [ #  #  #  #  :              0 :             if (w->in_use && !w->proc &&
                                              #  # ]
                                417                 :              0 :                 TimestampDifferenceExceeds(w->launch_time, now,
                                418                 :                :                                            wal_receiver_timeout))
                                419                 :                :             {
                                420         [ #  # ]:              0 :                 elog(WARNING,
                                421                 :                :                      "logical replication worker for subscription %u took too long to start; canceled",
                                422                 :                :                      w->subid);
                                423                 :                : 
                                424                 :              0 :                 logicalrep_worker_cleanup(w);
                                425                 :              0 :                 did_cleanup = true;
                                426                 :                :             }
                                427                 :                :         }
                                428                 :                : 
                                429         [ #  # ]:              0 :         if (did_cleanup)
                                430                 :              0 :             goto retry;
                                431                 :                :     }
                                432                 :                : 
                                433                 :                :     /*
                                434                 :                :      * We don't allow to invoke more sync workers once we have reached the
                                435                 :                :      * sync worker limit per subscription. So, just return silently as we
                                436                 :                :      * might get here because of an otherwise harmless race condition.
                                437                 :                :      */
  181 akapila@postgresql.o      438   [ +  +  +  + ]:GNC         457 :     if ((is_tablesync_worker || is_sequencesync_worker) &&
                                439         [ -  + ]:            222 :         nsyncworkers >= max_sync_workers_per_subscription)
                                440                 :                :     {
 3296 peter_e@gmx.net           441                 :UBC           0 :         LWLockRelease(LogicalRepWorkerLock);
 1212 akapila@postgresql.o      442                 :              0 :         return false;
                                443                 :                :     }
                                444                 :                : 
 1212 akapila@postgresql.o      445                 :CBC         457 :     nparallelapplyworkers = logicalrep_pa_worker_count(subid);
                                446                 :                : 
                                447                 :                :     /*
                                448                 :                :      * Return false if the number of parallel apply workers reached the limit
                                449                 :                :      * per subscription.
                                450                 :                :      */
                                451         [ +  + ]:            457 :     if (is_parallel_apply_worker &&
                                452         [ -  + ]:             12 :         nparallelapplyworkers >= max_parallel_apply_workers_per_subscription)
                                453                 :                :     {
 1212 akapila@postgresql.o      454                 :UBC           0 :         LWLockRelease(LogicalRepWorkerLock);
                                455                 :              0 :         return false;
                                456                 :                :     }
                                457                 :                : 
                                458                 :                :     /*
                                459                 :                :      * However if there are no more free worker slots, inform user about it
                                460                 :                :      * before exiting.
                                461                 :                :      */
 3393 peter_e@gmx.net           462         [ -  + ]:CBC         457 :     if (worker == NULL)
                                463                 :                :     {
 3388 fujii@postgresql.org      464                 :UBC           0 :         LWLockRelease(LogicalRepWorkerLock);
 3393 peter_e@gmx.net           465         [ #  # ]:              0 :         ereport(WARNING,
                                466                 :                :                 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
                                467                 :                :                  errmsg("out of logical replication worker slots"),
                                468                 :                :                  errhint("You might need to increase \"%s\".", "max_logical_replication_workers")));
 1212 akapila@postgresql.o      469                 :              0 :         return false;
                                470                 :                :     }
                                471                 :                : 
                                472                 :                :     /* Prepare the worker slot. */
  995 akapila@postgresql.o      473                 :CBC         457 :     worker->type = wtype;
 3296 peter_e@gmx.net           474                 :            457 :     worker->launch_time = now;
                                475                 :            457 :     worker->in_use = true;
                                476                 :            457 :     worker->generation++;
 3330                           477                 :            457 :     worker->proc = NULL;
 3393                           478                 :            457 :     worker->dbid = dbid;
                                479                 :            457 :     worker->userid = userid;
                                480                 :            457 :     worker->subid = subid;
 3330                           481                 :            457 :     worker->relid = relid;
                                482                 :            457 :     worker->relstate = SUBREL_STATE_UNKNOWN;
                                483                 :            457 :     worker->relstate_lsn = InvalidXLogRecPtr;
 1706 akapila@postgresql.o      484                 :            457 :     worker->stream_fileset = NULL;
 1203                           485         [ +  + ]:            457 :     worker->leader_pid = is_parallel_apply_worker ? MyProcPid : InvalidPid;
 1212                           486                 :            457 :     worker->parallel_apply = is_parallel_apply_worker;
  286 akapila@postgresql.o      487                 :GNC         457 :     worker->oldest_nonremovable_xid = retain_dead_tuples
                                488                 :              2 :         ? MyReplicationSlot->data.xmin
                                489         [ +  + ]:            457 :         : InvalidTransactionId;
 3330 peter_e@gmx.net           490                 :CBC         457 :     worker->last_lsn = InvalidXLogRecPtr;
                                491                 :            457 :     TIMESTAMP_NOBEGIN(worker->last_send_time);
                                492                 :            457 :     TIMESTAMP_NOBEGIN(worker->last_recv_time);
                                493                 :            457 :     worker->reply_lsn = InvalidXLogRecPtr;
                                494                 :            457 :     TIMESTAMP_NOBEGIN(worker->reply_time);
  181 akapila@postgresql.o      495                 :GNC         457 :     worker->last_seqsync_start_time = 0;
                                496                 :                : 
                                497                 :                :     /* Before releasing lock, remember generation for future identification. */
 3151 tgl@sss.pgh.pa.us         498                 :CBC         457 :     generation = worker->generation;
                                499                 :                : 
 3393 peter_e@gmx.net           500                 :            457 :     LWLockRelease(LogicalRepWorkerLock);
                                501                 :                : 
                                502                 :                :     /* Register the new dynamic worker. */
 3306 tgl@sss.pgh.pa.us         503                 :            457 :     memset(&bgw, 0, sizeof(bgw));
 3275 bruce@momjian.us          504                 :            457 :     bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
                                505                 :                :         BGWORKER_BACKEND_DATABASE_CONNECTION;
 3393 peter_e@gmx.net           506                 :            457 :     bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
 1037 nathan@postgresql.or      507                 :            457 :     snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres");
                                508                 :                : 
  987 akapila@postgresql.o      509   [ +  +  +  +  :            457 :     switch (worker->type)
                                              -  - ]
                                510                 :                :     {
                                511                 :            223 :         case WORKERTYPE_APPLY:
                                512                 :            223 :             snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain");
                                513                 :            223 :             snprintf(bgw.bgw_name, BGW_MAXLEN,
                                514                 :                :                      "logical replication apply worker for subscription %u",
                                515                 :                :                      subid);
                                516                 :            223 :             snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication apply worker");
                                517                 :            223 :             break;
                                518                 :                : 
                                519                 :             12 :         case WORKERTYPE_PARALLEL_APPLY:
                                520                 :             12 :             snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ParallelApplyWorkerMain");
                                521                 :             12 :             snprintf(bgw.bgw_name, BGW_MAXLEN,
                                522                 :                :                      "logical replication parallel apply worker for subscription %u",
                                523                 :                :                      subid);
                                524                 :             12 :             snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication parallel worker");
                                525                 :                : 
                                526                 :             12 :             memcpy(bgw.bgw_extra, &subworker_dsm, sizeof(dsm_handle));
                                527                 :             12 :             break;
                                528                 :                : 
  181 akapila@postgresql.o      529                 :GNC          10 :         case WORKERTYPE_SEQUENCESYNC:
                                530                 :             10 :             snprintf(bgw.bgw_function_name, BGW_MAXLEN, "SequenceSyncWorkerMain");
                                531                 :             10 :             snprintf(bgw.bgw_name, BGW_MAXLEN,
                                532                 :                :                      "logical replication sequencesync worker for subscription %u",
                                533                 :                :                      subid);
                                534                 :             10 :             snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication sequencesync worker");
                                535                 :             10 :             break;
                                536                 :                : 
  987 akapila@postgresql.o      537                 :CBC         212 :         case WORKERTYPE_TABLESYNC:
  181 akapila@postgresql.o      538                 :GNC         212 :             snprintf(bgw.bgw_function_name, BGW_MAXLEN, "TableSyncWorkerMain");
  987 akapila@postgresql.o      539                 :CBC         212 :             snprintf(bgw.bgw_name, BGW_MAXLEN,
                                540                 :                :                      "logical replication tablesync worker for subscription %u sync %u",
                                541                 :                :                      subid,
                                542                 :                :                      relid);
                                543                 :            212 :             snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication tablesync worker");
                                544                 :            212 :             break;
                                545                 :                : 
  987 akapila@postgresql.o      546                 :UBC           0 :         case WORKERTYPE_UNKNOWN:
                                547                 :                :             /* Should never happen. */
                                548         [ #  # ]:              0 :             elog(ERROR, "unknown worker type");
                                549                 :                :     }
                                550                 :                : 
 3393 peter_e@gmx.net           551                 :CBC         457 :     bgw.bgw_restart_time = BGW_NEVER_RESTART;
                                552                 :            457 :     bgw.bgw_notify_pid = MyProcPid;
 3303 fujii@postgresql.org      553                 :            457 :     bgw.bgw_main_arg = Int32GetDatum(slot);
                                554                 :                : 
 3393 peter_e@gmx.net           555         [ -  + ]:            457 :     if (!RegisterDynamicBackgroundWorker(&bgw, &bgw_handle))
                                556                 :                :     {
                                557                 :                :         /* Failed to start worker, so clean up the worker slot. */
 3151 tgl@sss.pgh.pa.us         558                 :UBC           0 :         LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
                                559         [ #  # ]:              0 :         Assert(generation == worker->generation);
                                560                 :              0 :         logicalrep_worker_cleanup(worker);
                                561                 :              0 :         LWLockRelease(LogicalRepWorkerLock);
                                562                 :                : 
 3393 peter_e@gmx.net           563         [ #  # ]:              0 :         ereport(WARNING,
                                564                 :                :                 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
                                565                 :                :                  errmsg("out of background worker slots"),
                                566                 :                :                  errhint("You might need to increase \"%s\".", "max_worker_processes")));
 1212 akapila@postgresql.o      567                 :              0 :         return false;
                                568                 :                :     }
                                569                 :                : 
                                570                 :                :     /* Now wait until it attaches. */
 1212 akapila@postgresql.o      571                 :CBC         457 :     return WaitForReplicationWorkerAttach(worker, generation, bgw_handle);
                                572                 :                : }
                                573                 :                : 
                                574                 :                : /*
                                575                 :                :  * Internal function to stop the worker and wait until it detaches from the
                                576                 :                :  * slot.
                                577                 :                :  */
                                578                 :                : static void
                                579                 :             90 : logicalrep_worker_stop_internal(LogicalRepWorker *worker, int signo)
                                580                 :                : {
                                581                 :                :     uint16      generation;
                                582                 :                : 
                                583         [ -  + ]:             90 :     Assert(LWLockHeldByMeInMode(LogicalRepWorkerLock, LW_SHARED));
                                584                 :                : 
                                585                 :                :     /*
                                586                 :                :      * Remember which generation was our worker so we can check if what we see
                                587                 :                :      * is still the same one.
                                588                 :                :      */
 3296 peter_e@gmx.net           589                 :             90 :     generation = worker->generation;
                                590                 :                : 
                                591                 :                :     /*
                                592                 :                :      * If we found a worker but it does not have proc set then it is still
                                593                 :                :      * starting up; wait for it to finish starting and then kill it.
                                594                 :                :      */
                                595   [ +  -  +  + ]:             90 :     while (worker->in_use && !worker->proc)
                                596                 :                :     {
                                597                 :                :         int         rc;
                                598                 :                : 
 3393                           599                 :              3 :         LWLockRelease(LogicalRepWorkerLock);
                                600                 :                : 
                                601                 :                :         /* Wait a bit --- we don't expect to have to wait long. */
 3255 andres@anarazel.de        602                 :              3 :         rc = WaitLatch(MyLatch,
                                603                 :                :                        WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
                                604                 :                :                        10L, WAIT_EVENT_BGWORKER_STARTUP);
                                605                 :                : 
                                606         [ -  + ]:              3 :         if (rc & WL_LATCH_SET)
                                607                 :                :         {
 3255 andres@anarazel.de        608                 :UBC           0 :             ResetLatch(MyLatch);
                                609         [ #  # ]:              0 :             CHECK_FOR_INTERRUPTS();
                                610                 :                :         }
                                611                 :                : 
                                612                 :                :         /* Recheck worker status. */
 3393 peter_e@gmx.net           613                 :CBC           3 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
                                614                 :                : 
                                615                 :                :         /*
                                616                 :                :          * Check whether the worker slot is no longer used, which would mean
                                617                 :                :          * that the worker has exited, or whether the worker generation is
                                618                 :                :          * different, meaning that a different worker has taken the slot.
                                619                 :                :          */
 3296                           620   [ +  -  -  + ]:              3 :         if (!worker->in_use || worker->generation != generation)
 3389 peter_e@gmx.net           621                 :UBC           0 :             return;
                                622                 :                : 
                                623                 :                :         /* Worker has assigned proc, so it has started. */
 3389 peter_e@gmx.net           624         [ +  - ]:CBC           3 :         if (worker->proc)
 3393                           625                 :              3 :             break;
                                626                 :                :     }
                                627                 :                : 
                                628                 :                :     /* Now terminate the worker ... */
 1212 akapila@postgresql.o      629                 :             90 :     kill(worker->proc->pid, signo);
                                630                 :                : 
                                631                 :                :     /* ... and wait for it to die. */
                                632                 :                :     for (;;)
 3393 peter_e@gmx.net           633                 :            128 :     {
                                634                 :                :         int         rc;
                                635                 :                : 
                                636                 :                :         /* is it gone? */
 3296                           637   [ +  +  +  + ]:            218 :         if (!worker->proc || worker->generation != generation)
                                638                 :                :             break;
                                639                 :                : 
 3230 tgl@sss.pgh.pa.us         640                 :            128 :         LWLockRelease(LogicalRepWorkerLock);
                                641                 :                : 
                                642                 :                :         /* Wait a bit --- we don't expect to have to wait long. */
 3255 andres@anarazel.de        643                 :            128 :         rc = WaitLatch(MyLatch,
                                644                 :                :                        WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
                                645                 :                :                        10L, WAIT_EVENT_BGWORKER_SHUTDOWN);
                                646                 :                : 
                                647         [ +  + ]:            128 :         if (rc & WL_LATCH_SET)
                                648                 :                :         {
                                649                 :             42 :             ResetLatch(MyLatch);
                                650         [ +  + ]:             42 :             CHECK_FOR_INTERRUPTS();
                                651                 :                :         }
                                652                 :                : 
 3230 tgl@sss.pgh.pa.us         653                 :            128 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
                                654                 :                :     }
                                655                 :                : }
                                656                 :                : 
                                657                 :                : /*
                                658                 :                :  * Stop the logical replication worker that matches the specified worker type,
                                659                 :                :  * subscription id, and relation id.
                                660                 :                :  */
                                661                 :                : void
  189 akapila@postgresql.o      662                 :GNC         103 : logicalrep_worker_stop(LogicalRepWorkerType wtype, Oid subid, Oid relid)
                                663                 :                : {
                                664                 :                :     LogicalRepWorker *worker;
                                665                 :                : 
                                666                 :                :     /* relid must be valid only for table sync workers */
                                667         [ -  + ]:            103 :     Assert((wtype == WORKERTYPE_TABLESYNC) == OidIsValid(relid));
                                668                 :                : 
 1212 akapila@postgresql.o      669                 :CBC         103 :     LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
                                670                 :                : 
  189 akapila@postgresql.o      671                 :GNC         103 :     worker = logicalrep_worker_find(wtype, subid, relid, false);
                                672                 :                : 
 1212 akapila@postgresql.o      673         [ +  + ]:CBC         103 :     if (worker)
                                674                 :                :     {
                                675   [ +  -  -  + ]:             81 :         Assert(!isParallelApplyWorker(worker));
                                676                 :             81 :         logicalrep_worker_stop_internal(worker, SIGTERM);
                                677                 :                :     }
                                678                 :                : 
                                679                 :            103 :     LWLockRelease(LogicalRepWorkerLock);
                                680                 :            103 : }
                                681                 :                : 
                                682                 :                : /*
                                683                 :                :  * Stop the given logical replication parallel apply worker.
                                684                 :                :  *
                                685                 :                :  * Node that the function sends SIGUSR2 instead of SIGTERM to the parallel apply
                                686                 :                :  * worker so that the worker exits cleanly.
                                687                 :                :  */
                                688                 :                : void
 1092                           689                 :              5 : logicalrep_pa_worker_stop(ParallelApplyWorkerInfo *winfo)
                                690                 :                : {
                                691                 :                :     int         slot_no;
                                692                 :                :     uint16      generation;
                                693                 :                :     LogicalRepWorker *worker;
                                694                 :                : 
                                695         [ -  + ]:              5 :     SpinLockAcquire(&winfo->shared->mutex);
                                696                 :              5 :     generation = winfo->shared->logicalrep_worker_generation;
                                697                 :              5 :     slot_no = winfo->shared->logicalrep_worker_slot_no;
                                698                 :              5 :     SpinLockRelease(&winfo->shared->mutex);
                                699                 :                : 
 1212                           700   [ +  -  -  + ]:              5 :     Assert(slot_no >= 0 && slot_no < max_logical_replication_workers);
                                701                 :                : 
                                702                 :                :     /*
                                703                 :                :      * Detach from the error_mq_handle for the parallel apply worker before
                                704                 :                :      * stopping it. This prevents the leader apply worker from trying to
                                705                 :                :      * receive the message from the error queue that might already be detached
                                706                 :                :      * by the parallel apply worker.
                                707                 :                :      */
 1092                           708         [ +  - ]:              5 :     if (winfo->error_mq_handle)
                                709                 :                :     {
                                710                 :              5 :         shm_mq_detach(winfo->error_mq_handle);
                                711                 :              5 :         winfo->error_mq_handle = NULL;
                                712                 :                :     }
                                713                 :                : 
 1212                           714                 :              5 :     LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
                                715                 :                : 
                                716                 :              5 :     worker = &LogicalRepCtx->workers[slot_no];
                                717   [ +  -  -  + ]:              5 :     Assert(isParallelApplyWorker(worker));
                                718                 :                : 
                                719                 :                :     /*
                                720                 :                :      * Only stop the worker if the generation matches and the worker is alive.
                                721                 :                :      */
                                722   [ +  -  +  - ]:              5 :     if (worker->generation == generation && worker->proc)
  223                           723                 :              5 :         logicalrep_worker_stop_internal(worker, SIGUSR2);
                                724                 :                : 
 3230 tgl@sss.pgh.pa.us         725                 :              5 :     LWLockRelease(LogicalRepWorkerLock);
 3393 peter_e@gmx.net           726                 :              5 : }
                                727                 :                : 
                                728                 :                : /*
                                729                 :                :  * Wake up (using latch) any logical replication worker that matches the
                                730                 :                :  * specified worker type, subscription id, and relation id.
                                731                 :                :  */
                                732                 :                : void
  189 akapila@postgresql.o      733                 :GNC         227 : logicalrep_worker_wakeup(LogicalRepWorkerType wtype, Oid subid, Oid relid)
                                734                 :                : {
                                735                 :                :     LogicalRepWorker *worker;
                                736                 :                : 
                                737                 :                :     /* relid must be valid only for table sync workers */
                                738         [ -  + ]:            227 :     Assert((wtype == WORKERTYPE_TABLESYNC) == OidIsValid(relid));
                                739                 :                : 
 3330 peter_e@gmx.net           740                 :CBC         227 :     LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
                                741                 :                : 
  189 akapila@postgresql.o      742                 :GNC         227 :     worker = logicalrep_worker_find(wtype, subid, relid, true);
                                743                 :                : 
 3330 peter_e@gmx.net           744         [ +  - ]:CBC         227 :     if (worker)
                                745                 :            227 :         logicalrep_worker_wakeup_ptr(worker);
                                746                 :                : 
 3231 tgl@sss.pgh.pa.us         747                 :            227 :     LWLockRelease(LogicalRepWorkerLock);
 3330 peter_e@gmx.net           748                 :            227 : }
                                749                 :                : 
                                750                 :                : /*
                                751                 :                :  * Wake up (using latch) the specified logical replication worker.
                                752                 :                :  *
                                753                 :                :  * Caller must hold lock, else worker->proc could change under us.
                                754                 :                :  */
                                755                 :                : void
                                756                 :            698 : logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker)
                                757                 :                : {
 3231 tgl@sss.pgh.pa.us         758         [ -  + ]:            698 :     Assert(LWLockHeldByMe(LogicalRepWorkerLock));
                                759                 :                : 
 3330 peter_e@gmx.net           760                 :            698 :     SetLatch(&worker->proc->procLatch);
                                761                 :            698 : }
                                762                 :                : 
                                763                 :                : /*
                                764                 :                :  * Attach to a slot.
                                765                 :                :  */
                                766                 :                : void
 3393                           767                 :            592 : logicalrep_worker_attach(int slot)
                                768                 :                : {
                                769                 :                :     /* Block concurrent access. */
                                770                 :            592 :     LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
                                771                 :                : 
                                772   [ +  -  -  + ]:            592 :     Assert(slot >= 0 && slot < max_logical_replication_workers);
                                773                 :            592 :     MyLogicalRepWorker = &LogicalRepCtx->workers[slot];
                                774                 :                : 
 3296                           775         [ -  + ]:            592 :     if (!MyLogicalRepWorker->in_use)
                                776                 :                :     {
 3296 peter_e@gmx.net           777                 :UBC           0 :         LWLockRelease(LogicalRepWorkerLock);
                                778         [ #  # ]:              0 :         ereport(ERROR,
                                779                 :                :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                                780                 :                :                  errmsg("logical replication worker slot %d is empty, cannot attach",
                                781                 :                :                         slot)));
                                782                 :                :     }
                                783                 :                : 
 3393 peter_e@gmx.net           784         [ -  + ]:CBC         592 :     if (MyLogicalRepWorker->proc)
                                785                 :                :     {
 3296 peter_e@gmx.net           786                 :UBC           0 :         LWLockRelease(LogicalRepWorkerLock);
 3393                           787         [ #  # ]:              0 :         ereport(ERROR,
                                788                 :                :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                                789                 :                :                  errmsg("logical replication worker slot %d is already used by "
                                790                 :                :                         "another worker, cannot attach", slot)));
                                791                 :                :     }
                                792                 :                : 
 3393 peter_e@gmx.net           793                 :CBC         592 :     MyLogicalRepWorker->proc = MyProc;
                                794                 :            592 :     before_shmem_exit(logicalrep_worker_onexit, (Datum) 0);
                                795                 :                : 
                                796                 :            592 :     LWLockRelease(LogicalRepWorkerLock);
                                797                 :            592 : }
                                798                 :                : 
                                799                 :                : /*
                                800                 :                :  * Stop the parallel apply workers if any, and detach the leader apply worker
                                801                 :                :  * (cleans up the worker info).
                                802                 :                :  */
                                803                 :                : static void
                                804                 :            592 : logicalrep_worker_detach(void)
                                805                 :                : {
                                806                 :                :     /* Stop the parallel apply workers. */
 1212 akapila@postgresql.o      807         [ +  + ]:            592 :     if (am_leader_apply_worker())
                                808                 :                :     {
                                809                 :                :         List       *workers;
                                810                 :                :         ListCell   *lc;
                                811                 :                : 
                                812                 :                :         /*
                                813                 :                :          * Detach from the error_mq_handle for all parallel apply workers
                                814                 :                :          * before terminating them. This prevents the leader apply worker from
                                815                 :                :          * receiving the worker termination message and sending it to logs
                                816                 :                :          * when the same is already done by the parallel worker.
                                817                 :                :          */
                                818                 :            356 :         pa_detach_all_error_mq();
                                819                 :                : 
                                820                 :            356 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
                                821                 :                : 
  650                           822                 :            356 :         workers = logicalrep_workers_find(MyLogicalRepWorker->subid, true, false);
 1212                           823   [ +  -  +  +  :            717 :         foreach(lc, workers)
                                              +  + ]
                                824                 :                :         {
                                825                 :            361 :             LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
                                826                 :                : 
                                827   [ +  -  +  + ]:            361 :             if (isParallelApplyWorker(w))
                                828                 :              4 :                 logicalrep_worker_stop_internal(w, SIGTERM);
                                829                 :                :         }
                                830                 :                : 
                                831                 :            356 :         LWLockRelease(LogicalRepWorkerLock);
                                832                 :                : 
  276 tgl@sss.pgh.pa.us         833                 :GNC         356 :         list_free(workers);
                                834                 :                :     }
                                835                 :                : 
                                836                 :                :     /* Block concurrent access. */
 3393 peter_e@gmx.net           837                 :CBC         592 :     LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
                                838                 :                : 
 3296                           839                 :            592 :     logicalrep_worker_cleanup(MyLogicalRepWorker);
                                840                 :                : 
 3393                           841                 :            592 :     LWLockRelease(LogicalRepWorkerLock);
                                842                 :            592 : }
                                843                 :                : 
                                844                 :                : /*
                                845                 :                :  * Clean up worker info.
                                846                 :                :  */
                                847                 :                : static void
 3296                           848                 :            592 : logicalrep_worker_cleanup(LogicalRepWorker *worker)
                                849                 :                : {
                                850         [ -  + ]:            592 :     Assert(LWLockHeldByMeInMode(LogicalRepWorkerLock, LW_EXCLUSIVE));
                                851                 :                : 
  984 akapila@postgresql.o      852                 :            592 :     worker->type = WORKERTYPE_UNKNOWN;
 3296 peter_e@gmx.net           853                 :            592 :     worker->in_use = false;
                                854                 :            592 :     worker->proc = NULL;
                                855                 :            592 :     worker->dbid = InvalidOid;
                                856                 :            592 :     worker->userid = InvalidOid;
                                857                 :            592 :     worker->subid = InvalidOid;
                                858                 :            592 :     worker->relid = InvalidOid;
 1203 akapila@postgresql.o      859                 :            592 :     worker->leader_pid = InvalidPid;
 1212                           860                 :            592 :     worker->parallel_apply = false;
 3296 peter_e@gmx.net           861                 :            592 : }
                                862                 :                : 
                                863                 :                : /*
                                864                 :                :  * Cleanup function for logical replication launcher.
                                865                 :                :  *
                                866                 :                :  * Called on logical replication launcher exit.
                                867                 :                :  */
                                868                 :                : static void
 3303 fujii@postgresql.org      869                 :            513 : logicalrep_launcher_onexit(int code, Datum arg)
                                870                 :                : {
                                871                 :            513 :     LogicalRepCtx->launcher_pid = 0;
                                872                 :            513 : }
                                873                 :                : 
                                874                 :                : /*
                                875                 :                :  * Reset the last_seqsync_start_time of the sequencesync worker in the
                                876                 :                :  * subscription's apply worker.
                                877                 :                :  *
                                878                 :                :  * Note that this value is not stored in the sequencesync worker, because that
                                879                 :                :  * has finished already and is about to exit.
                                880                 :                :  */
                                881                 :                : void
  181 akapila@postgresql.o      882                 :GNC           5 : logicalrep_reset_seqsync_start_time(void)
                                883                 :                : {
                                884                 :                :     LogicalRepWorker *worker;
                                885                 :                : 
                                886                 :                :     /*
                                887                 :                :      * The apply worker can't access last_seqsync_start_time concurrently, so
                                888                 :                :      * it is okay to use SHARED lock here. See ProcessSequencesForSync().
                                889                 :                :      */
                                890                 :              5 :     LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
                                891                 :                : 
                                892                 :              5 :     worker = logicalrep_worker_find(WORKERTYPE_APPLY,
                                893                 :              5 :                                     MyLogicalRepWorker->subid, InvalidOid,
                                894                 :                :                                     true);
                                895         [ +  - ]:              5 :     if (worker)
                                896                 :              5 :         worker->last_seqsync_start_time = 0;
                                897                 :                : 
                                898                 :              5 :     LWLockRelease(LogicalRepWorkerLock);
                                899                 :              5 : }
                                900                 :                : 
                                901                 :                : /*
                                902                 :                :  * Cleanup function.
                                903                 :                :  *
                                904                 :                :  * Called on logical replication worker exit.
                                905                 :                :  */
                                906                 :                : static void
 3393 peter_e@gmx.net           907                 :CBC         592 : logicalrep_worker_onexit(int code, Datum arg)
                                908                 :                : {
                                909                 :                :     /* Disconnect gracefully from the remote side. */
 1819 alvherre@alvh.no-ip.      910         [ +  + ]:            592 :     if (LogRepWorkerWalRcvConn)
                                911                 :            471 :         walrcv_disconnect(LogRepWorkerWalRcvConn);
                                912                 :                : 
 3393 peter_e@gmx.net           913                 :            592 :     logicalrep_worker_detach();
                                914                 :                : 
                                915                 :                :     /* Cleanup fileset used for streaming transactions. */
 1706 akapila@postgresql.o      916         [ +  + ]:            592 :     if (MyLogicalRepWorker->stream_fileset != NULL)
                                917                 :             14 :         FileSetDeleteAll(MyLogicalRepWorker->stream_fileset);
                                918                 :                : 
                                919                 :                :     /*
                                920                 :                :      * Session level locks may be acquired outside of a transaction in
                                921                 :                :      * parallel apply mode and will not be released when the worker
                                922                 :                :      * terminates, so manually release all locks before the worker exits.
                                923                 :                :      *
                                924                 :                :      * The locks will be acquired once the worker is initialized.
                                925                 :                :      */
 1098                           926         [ +  + ]:            592 :     if (!InitializingApplyWorker)
                                927                 :            520 :         LockReleaseAll(DEFAULT_LOCKMETHOD, true);
                                928                 :                : 
 3260 peter_e@gmx.net           929                 :            592 :     ApplyLauncherWakeup();
 3393                           930                 :            592 : }
                                931                 :                : 
                                932                 :                : /*
                                933                 :                :  * Count the number of registered (not necessarily running) sync workers
                                934                 :                :  * for a subscription.
                                935                 :                :  */
                                936                 :                : int
 3330                           937                 :           1384 : logicalrep_sync_worker_count(Oid subid)
                                938                 :                : {
                                939                 :                :     int         i;
 3275 bruce@momjian.us          940                 :           1384 :     int         res = 0;
                                941                 :                : 
 3330 peter_e@gmx.net           942         [ -  + ]:           1384 :     Assert(LWLockHeldByMe(LogicalRepWorkerLock));
                                943                 :                : 
                                944                 :                :     /* Search for attached worker for a given subscription id. */
                                945         [ +  + ]:           7126 :     for (i = 0; i < max_logical_replication_workers; i++)
                                946                 :                :     {
 3275 bruce@momjian.us          947                 :           5742 :         LogicalRepWorker *w = &LogicalRepCtx->workers[i];
                                948                 :                : 
  181 akapila@postgresql.o      949   [ +  +  +  -  :GNC        5742 :         if (w->subid == subid && (isTableSyncWorker(w) || isSequenceSyncWorker(w)))
                                     +  +  +  -  -  
                                                 + ]
 3330 peter_e@gmx.net           950                 :CBC        1514 :             res++;
                                951                 :                :     }
                                952                 :                : 
                                953                 :           1384 :     return res;
                                954                 :                : }
                                955                 :                : 
                                956                 :                : /*
                                957                 :                :  * Count the number of registered (but not necessarily running) parallel apply
                                958                 :                :  * workers for a subscription.
                                959                 :                :  */
                                960                 :                : static int
 1212 akapila@postgresql.o      961                 :            457 : logicalrep_pa_worker_count(Oid subid)
                                962                 :                : {
                                963                 :                :     int         i;
                                964                 :            457 :     int         res = 0;
                                965                 :                : 
                                966         [ -  + ]:            457 :     Assert(LWLockHeldByMe(LogicalRepWorkerLock));
                                967                 :                : 
                                968                 :                :     /*
                                969                 :                :      * Scan all attached parallel apply workers, only counting those which
                                970                 :                :      * have the given subscription id.
                                971                 :                :      */
                                972         [ +  + ]:           2409 :     for (i = 0; i < max_logical_replication_workers; i++)
                                973                 :                :     {
                                974                 :           1952 :         LogicalRepWorker *w = &LogicalRepCtx->workers[i];
                                975                 :                : 
  984                           976   [ +  +  +  +  :           1952 :         if (isParallelApplyWorker(w) && w->subid == subid)
                                              +  - ]
 1212                           977                 :              2 :             res++;
                                978                 :                :     }
                                979                 :                : 
                                980                 :            457 :     return res;
                                981                 :                : }
                                982                 :                : 
                                983                 :                : /*
                                984                 :                :  * ApplyLauncherShmemRequest
                                985                 :                :  *      Register shared memory space needed for replication launcher
                                986                 :                :  */
                                987                 :                : static void
   29 heikki.linnakangas@i      988                 :GNC        1244 : ApplyLauncherShmemRequest(void *arg)
                                989                 :                : {
                                990                 :                :     Size        size;
                                991                 :                : 
                                992                 :                :     /*
                                993                 :                :      * Need the fixed struct and the array of LogicalRepWorker.
                                994                 :                :      */
 3393 peter_e@gmx.net           995                 :CBC        1244 :     size = sizeof(LogicalRepCtxStruct);
                                996                 :           1244 :     size = MAXALIGN(size);
                                997                 :           1244 :     size = add_size(size, mul_size(max_logical_replication_workers,
                                998                 :                :                                    sizeof(LogicalRepWorker)));
   29 heikki.linnakangas@i      999                 :GNC        1244 :     ShmemRequestStruct(.name = "Logical Replication Launcher Data",
                               1000                 :                :                        .size = size,
                               1001                 :                :                        .ptr = (void **) &LogicalRepCtx,
                               1002                 :                :         );
 3393 peter_e@gmx.net          1003                 :GIC        1244 : }
                               1004                 :                : 
                               1005                 :                : /*
                               1006                 :                :  * ApplyLauncherRegister
                               1007                 :                :  *      Register a background worker running the logical replication launcher.
                               1008                 :                :  */
                               1009                 :                : void
 3393 peter_e@gmx.net          1010                 :CBC        1004 : ApplyLauncherRegister(void)
                               1011                 :                : {
                               1012                 :                :     BackgroundWorker bgw;
                               1013                 :                : 
                               1014                 :                :     /*
                               1015                 :                :      * The logical replication launcher is disabled during binary upgrades, to
                               1016                 :                :      * prevent logical replication workers from running on the source cluster.
                               1017                 :                :      * That could cause replication origins to move forward after having been
                               1018                 :                :      * copied to the target cluster, potentially creating conflicts with the
                               1019                 :                :      * copied data files.
                               1020                 :                :      */
  844 michael@paquier.xyz      1021   [ +  +  +  + ]:           1004 :     if (max_logical_replication_workers == 0 || IsBinaryUpgrade)
 3393 peter_e@gmx.net          1022                 :             61 :         return;
                               1023                 :                : 
 3306 tgl@sss.pgh.pa.us        1024                 :            943 :     memset(&bgw, 0, sizeof(bgw));
 3275 bruce@momjian.us         1025                 :            943 :     bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
                               1026                 :                :         BGWORKER_BACKEND_DATABASE_CONNECTION;
 3393 peter_e@gmx.net          1027                 :            943 :     bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
 1037 nathan@postgresql.or     1028                 :            943 :     snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres");
 3322 rhaas@postgresql.org     1029                 :            943 :     snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyLauncherMain");
 3393 peter_e@gmx.net          1030                 :            943 :     snprintf(bgw.bgw_name, BGW_MAXLEN,
                               1031                 :                :              "logical replication launcher");
 3169                          1032                 :            943 :     snprintf(bgw.bgw_type, BGW_MAXLEN,
                               1033                 :                :              "logical replication launcher");
 3393                          1034                 :            943 :     bgw.bgw_restart_time = 5;
                               1035                 :            943 :     bgw.bgw_notify_pid = 0;
                               1036                 :            943 :     bgw.bgw_main_arg = (Datum) 0;
                               1037                 :                : 
                               1038                 :            943 :     RegisterBackgroundWorker(&bgw);
                               1039                 :                : }
                               1040                 :                : 
                               1041                 :                : /*
                               1042                 :                :  * ApplyLauncherShmemInit
                               1043                 :                :  *      Initialize replication launcher shared memory
                               1044                 :                :  */
                               1045                 :                : static void
   29 heikki.linnakangas@i     1046                 :GNC        1241 : ApplyLauncherShmemInit(void *arg)
                               1047                 :                : {
                               1048                 :                :     int         slot;
                               1049                 :                : 
                               1050                 :           1241 :     LogicalRepCtx->last_start_dsa = DSA_HANDLE_INVALID;
                               1051                 :           1241 :     LogicalRepCtx->last_start_dsh = DSHASH_HANDLE_INVALID;
                               1052                 :                : 
                               1053                 :                :     /* Initialize memory and spin locks for each worker slot. */
                               1054         [ +  + ]:           6168 :     for (slot = 0; slot < max_logical_replication_workers; slot++)
                               1055                 :                :     {
                               1056                 :           4927 :         LogicalRepWorker *worker = &LogicalRepCtx->workers[slot];
                               1057                 :                : 
                               1058                 :           4927 :         memset(worker, 0, sizeof(LogicalRepWorker));
                               1059                 :           4927 :         SpinLockInit(&worker->relmutex);
                               1060                 :                :     }
 3393 peter_e@gmx.net          1061                 :CBC        1241 : }
                               1062                 :                : 
                               1063                 :                : /*
                               1064                 :                :  * Initialize or attach to the dynamic shared hash table that stores the
                               1065                 :                :  * last-start times, if not already done.
                               1066                 :                :  * This must be called before accessing the table.
                               1067                 :                :  */
                               1068                 :                : static void
 1199 tgl@sss.pgh.pa.us        1069                 :            841 : logicalrep_launcher_attach_dshmem(void)
                               1070                 :                : {
                               1071                 :                :     MemoryContext oldcontext;
                               1072                 :                : 
                               1073                 :                :     /* Quick exit if we already did this. */
 1196                          1074         [ +  + ]:            841 :     if (LogicalRepCtx->last_start_dsh != DSHASH_HANDLE_INVALID &&
 1199                          1075         [ +  + ]:            781 :         last_start_times != NULL)
                               1076                 :            575 :         return;
                               1077                 :                : 
                               1078                 :                :     /* Otherwise, use a lock to ensure only one process creates the table. */
                               1079                 :            266 :     LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
                               1080                 :                : 
                               1081                 :                :     /* Be sure any local memory allocated by DSA routines is persistent. */
                               1082                 :            266 :     oldcontext = MemoryContextSwitchTo(TopMemoryContext);
                               1083                 :                : 
 1196                          1084         [ +  + ]:            266 :     if (LogicalRepCtx->last_start_dsh == DSHASH_HANDLE_INVALID)
                               1085                 :                :     {
                               1086                 :                :         /* Initialize dynamic shared hash table for last-start times. */
 1199                          1087                 :             60 :         last_start_times_dsa = dsa_create(LWTRANCHE_LAUNCHER_DSA);
                               1088                 :             60 :         dsa_pin(last_start_times_dsa);
                               1089                 :             60 :         dsa_pin_mapping(last_start_times_dsa);
  799 nathan@postgresql.or     1090                 :             60 :         last_start_times = dshash_create(last_start_times_dsa, &dsh_params, NULL);
                               1091                 :                : 
                               1092                 :                :         /* Store handles in shared memory for other backends to use. */
 1199 tgl@sss.pgh.pa.us        1093                 :             60 :         LogicalRepCtx->last_start_dsa = dsa_get_handle(last_start_times_dsa);
                               1094                 :             60 :         LogicalRepCtx->last_start_dsh = dshash_get_hash_table_handle(last_start_times);
                               1095                 :                :     }
                               1096         [ +  - ]:            206 :     else if (!last_start_times)
                               1097                 :                :     {
                               1098                 :                :         /* Attach to existing dynamic shared hash table. */
                               1099                 :            206 :         last_start_times_dsa = dsa_attach(LogicalRepCtx->last_start_dsa);
                               1100                 :            206 :         dsa_pin_mapping(last_start_times_dsa);
                               1101                 :            206 :         last_start_times = dshash_attach(last_start_times_dsa, &dsh_params,
  333 nathan@postgresql.or     1102                 :            206 :                                          LogicalRepCtx->last_start_dsh, NULL);
                               1103                 :                :     }
                               1104                 :                : 
 1199 tgl@sss.pgh.pa.us        1105                 :            266 :     MemoryContextSwitchTo(oldcontext);
                               1106                 :            266 :     LWLockRelease(LogicalRepWorkerLock);
                               1107                 :                : }
                               1108                 :                : 
                               1109                 :                : /*
                               1110                 :                :  * Set the last-start time for the subscription.
                               1111                 :                :  */
                               1112                 :                : static void
                               1113                 :            223 : ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time)
                               1114                 :                : {
                               1115                 :                :     LauncherLastStartTimesEntry *entry;
                               1116                 :                :     bool        found;
                               1117                 :                : 
                               1118                 :            223 :     logicalrep_launcher_attach_dshmem();
                               1119                 :                : 
                               1120                 :            223 :     entry = dshash_find_or_insert(last_start_times, &subid, &found);
                               1121                 :            223 :     entry->last_start_time = start_time;
                               1122                 :            223 :     dshash_release_lock(last_start_times, entry);
                               1123                 :            223 : }
                               1124                 :                : 
                               1125                 :                : /*
                               1126                 :                :  * Return the last-start time for the subscription, or 0 if there isn't one.
                               1127                 :                :  */
                               1128                 :                : static TimestampTz
                               1129                 :            352 : ApplyLauncherGetWorkerStartTime(Oid subid)
                               1130                 :                : {
                               1131                 :                :     LauncherLastStartTimesEntry *entry;
                               1132                 :                :     TimestampTz ret;
                               1133                 :                : 
                               1134                 :            352 :     logicalrep_launcher_attach_dshmem();
                               1135                 :                : 
                               1136                 :            352 :     entry = dshash_find(last_start_times, &subid, false);
                               1137         [ +  + ]:            352 :     if (entry == NULL)
                               1138                 :            130 :         return 0;
                               1139                 :                : 
                               1140                 :            222 :     ret = entry->last_start_time;
                               1141                 :            222 :     dshash_release_lock(last_start_times, entry);
                               1142                 :                : 
                               1143                 :            222 :     return ret;
                               1144                 :                : }
                               1145                 :                : 
                               1146                 :                : /*
                               1147                 :                :  * Remove the last-start-time entry for the subscription, if one exists.
                               1148                 :                :  *
                               1149                 :                :  * This has two use-cases: to remove the entry related to a subscription
                               1150                 :                :  * that's been deleted or disabled (just to avoid leaking shared memory),
                               1151                 :                :  * and to allow immediate restart of an apply worker that has exited
                               1152                 :                :  * due to subscription parameter changes.
                               1153                 :                :  */
                               1154                 :                : void
                               1155                 :            266 : ApplyLauncherForgetWorkerStartTime(Oid subid)
                               1156                 :                : {
                               1157                 :            266 :     logicalrep_launcher_attach_dshmem();
                               1158                 :                : 
                               1159                 :            266 :     (void) dshash_delete_key(last_start_times, &subid);
                               1160                 :            266 : }
                               1161                 :                : 
                               1162                 :                : /*
                               1163                 :                :  * Wakeup the launcher on commit if requested.
                               1164                 :                :  */
                               1165                 :                : void
 3291 peter_e@gmx.net          1166                 :         423274 : AtEOXact_ApplyLauncher(bool isCommit)
                               1167                 :                : {
 3196                          1168         [ +  + ]:         423274 :     if (isCommit)
                               1169                 :                :     {
                               1170         [ +  + ]:         387624 :         if (on_commit_launcher_wakeup)
                               1171                 :            154 :             ApplyLauncherWakeup();
                               1172                 :                :     }
                               1173                 :                : 
 3291                          1174                 :         423274 :     on_commit_launcher_wakeup = false;
 3393                          1175                 :         423274 : }
                               1176                 :                : 
                               1177                 :                : /*
                               1178                 :                :  * Request wakeup of the launcher on commit of the transaction.
                               1179                 :                :  *
                               1180                 :                :  * This is used to send launcher signal to stop sleeping and process the
                               1181                 :                :  * subscriptions when current transaction commits. Should be used when new
                               1182                 :                :  * tuple was added to the pg_subscription catalog.
                               1183                 :                : */
                               1184                 :                : void
                               1185                 :            155 : ApplyLauncherWakeupAtCommit(void)
                               1186                 :                : {
 3375 heikki.linnakangas@i     1187         [ +  + ]:            155 :     if (!on_commit_launcher_wakeup)
                               1188                 :            154 :         on_commit_launcher_wakeup = true;
 3393 peter_e@gmx.net          1189                 :            155 : }
                               1190                 :                : 
                               1191                 :                : /*
                               1192                 :                :  * Wakeup the launcher immediately.
                               1193                 :                :  */
                               1194                 :                : void
                               1195                 :            783 : ApplyLauncherWakeup(void)
                               1196                 :                : {
 3303 fujii@postgresql.org     1197         [ +  + ]:            783 :     if (LogicalRepCtx->launcher_pid != 0)
 3393 peter_e@gmx.net          1198                 :            754 :         kill(LogicalRepCtx->launcher_pid, SIGUSR1);
                               1199                 :            783 : }
                               1200                 :                : 
                               1201                 :                : /*
                               1202                 :                :  * Main loop for the apply launcher process.
                               1203                 :                :  */
                               1204                 :                : void
                               1205                 :            513 : ApplyLauncherMain(Datum main_arg)
                               1206                 :                : {
 3343 tgl@sss.pgh.pa.us        1207         [ +  + ]:            513 :     ereport(DEBUG1,
                               1208                 :                :             (errmsg_internal("logical replication launcher started")));
                               1209                 :                : 
 3303 fujii@postgresql.org     1210                 :            513 :     before_shmem_exit(logicalrep_launcher_onexit, (Datum) 0);
                               1211                 :                : 
 3253 andres@anarazel.de       1212         [ -  + ]:            513 :     Assert(LogicalRepCtx->launcher_pid == 0);
                               1213                 :            513 :     LogicalRepCtx->launcher_pid = MyProcPid;
                               1214                 :                : 
                               1215                 :                :     /* Establish signal handlers. */
 2118 akapila@postgresql.o     1216                 :            513 :     pqsignal(SIGHUP, SignalHandlerForConfigReload);
 3393 peter_e@gmx.net          1217                 :            513 :     BackgroundWorkerUnblockSignals();
                               1218                 :                : 
                               1219                 :                :     /*
                               1220                 :                :      * Establish connection to nailed catalogs (we only ever access
                               1221                 :                :      * pg_subscription).
                               1222                 :                :      */
 2952 magnus@hagander.net      1223                 :            513 :     BackgroundWorkerInitializeConnection(NULL, NULL, 0);
                               1224                 :                : 
                               1225                 :                :     /*
                               1226                 :                :      * Acquire the conflict detection slot at startup to ensure it can be
                               1227                 :                :      * dropped if no longer needed after a restart.
                               1228                 :                :      */
  286 akapila@postgresql.o     1229                 :GNC         513 :     acquire_conflict_slot_if_exists();
                               1230                 :                : 
                               1231                 :                :     /* Enter main loop */
                               1232                 :                :     for (;;)
 3393 peter_e@gmx.net          1233                 :CBC        2700 :     {
                               1234                 :                :         int         rc;
                               1235                 :                :         List       *sublist;
                               1236                 :                :         ListCell   *lc;
                               1237                 :                :         MemoryContext subctx;
                               1238                 :                :         MemoryContext oldctx;
 3275 bruce@momjian.us         1239                 :           3213 :         long        wait_time = DEFAULT_NAPTIME_PER_CYCLE;
  245 akapila@postgresql.o     1240                 :GNC        3213 :         bool        can_update_xmin = true;
  286                          1241                 :           3213 :         bool        retain_dead_tuples = false;
                               1242                 :           3213 :         TransactionId xmin = InvalidTransactionId;
                               1243                 :                : 
 3253 andres@anarazel.de       1244         [ +  + ]:CBC        3213 :         CHECK_FOR_INTERRUPTS();
                               1245                 :                : 
                               1246                 :                :         /* Use temporary context to avoid leaking memory across cycles. */
 1199 tgl@sss.pgh.pa.us        1247                 :           3212 :         subctx = AllocSetContextCreate(TopMemoryContext,
                               1248                 :                :                                        "Logical Replication Launcher sublist",
                               1249                 :                :                                        ALLOCSET_DEFAULT_SIZES);
                               1250                 :           3212 :         oldctx = MemoryContextSwitchTo(subctx);
                               1251                 :                : 
                               1252                 :                :         /*
                               1253                 :                :          * Start any missing workers for enabled subscriptions.
                               1254                 :                :          *
                               1255                 :                :          * Also, during the iteration through all subscriptions, we compute
                               1256                 :                :          * the minimum XID required to protect deleted tuples for conflict
                               1257                 :                :          * detection if one of the subscription enables retain_dead_tuples
                               1258                 :                :          * option.
                               1259                 :                :          */
                               1260                 :           3212 :         sublist = get_subscription_list();
                               1261   [ +  +  +  +  :           4180 :         foreach(lc, sublist)
                                              +  + ]
                               1262                 :                :         {
                               1263                 :            970 :             Subscription *sub = (Subscription *) lfirst(lc);
                               1264                 :                :             LogicalRepWorker *w;
                               1265                 :                :             TimestampTz last_start;
                               1266                 :                :             TimestampTz now;
                               1267                 :                :             long        elapsed;
                               1268                 :                : 
  286 akapila@postgresql.o     1269         [ +  + ]:GNC         970 :             if (sub->retaindeadtuples)
                               1270                 :                :             {
                               1271                 :             69 :                 retain_dead_tuples = true;
                               1272                 :                : 
                               1273                 :                :                 /*
                               1274                 :                :                  * Create a replication slot to retain information necessary
                               1275                 :                :                  * for conflict detection such as dead tuples, commit
                               1276                 :                :                  * timestamps, and origins.
                               1277                 :                :                  *
                               1278                 :                :                  * The slot is created before starting the apply worker to
                               1279                 :                :                  * prevent it from unnecessarily maintaining its
                               1280                 :                :                  * oldest_nonremovable_xid.
                               1281                 :                :                  *
                               1282                 :                :                  * The slot is created even for a disabled subscription to
                               1283                 :                :                  * ensure that conflict-related information is available when
                               1284                 :                :                  * applying remote changes that occurred before the
                               1285                 :                :                  * subscription was enabled.
                               1286                 :                :                  */
                               1287                 :             69 :                 CreateConflictDetectionSlot();
                               1288                 :                : 
  245                          1289         [ +  - ]:             69 :                 if (sub->retentionactive)
                               1290                 :                :                 {
                               1291                 :                :                     /*
                               1292                 :                :                      * Can't advance xmin of the slot unless all the
                               1293                 :                :                      * subscriptions actively retaining dead tuples are
                               1294                 :                :                      * enabled. This is required to ensure that we don't
                               1295                 :                :                      * advance the xmin of CONFLICT_DETECTION_SLOT if one of
                               1296                 :                :                      * the subscriptions is not enabled. Otherwise, we won't
                               1297                 :                :                      * be able to detect conflicts reliably for such a
                               1298                 :                :                      * subscription even though it has set the
                               1299                 :                :                      * retain_dead_tuples option.
                               1300                 :                :                      */
                               1301                 :             69 :                     can_update_xmin &= sub->enabled;
                               1302                 :                : 
                               1303                 :                :                     /*
                               1304                 :                :                      * Initialize the slot once the subscription activates
                               1305                 :                :                      * retention.
                               1306                 :                :                      */
                               1307         [ -  + ]:             69 :                     if (!TransactionIdIsValid(MyReplicationSlot->data.xmin))
  245 akapila@postgresql.o     1308                 :UNC           0 :                         init_conflict_slot_xmin();
                               1309                 :                :                 }
                               1310                 :                :             }
                               1311                 :                : 
 1199 tgl@sss.pgh.pa.us        1312         [ +  + ]:CBC         970 :             if (!sub->enabled)
                               1313                 :             40 :                 continue;
                               1314                 :                : 
                               1315                 :            930 :             LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
  189 akapila@postgresql.o     1316                 :GNC         930 :             w = logicalrep_worker_find(WORKERTYPE_APPLY, sub->oid, InvalidOid,
                               1317                 :                :                                        false);
                               1318                 :                : 
 1199 tgl@sss.pgh.pa.us        1319         [ +  + ]:CBC         930 :             if (w != NULL)
                               1320                 :                :             {
                               1321                 :                :                 /*
                               1322                 :                :                  * Compute the minimum xmin required to protect dead tuples
                               1323                 :                :                  * required for conflict detection among all running apply
                               1324                 :                :                  * workers. This computation is performed while holding
                               1325                 :                :                  * LogicalRepWorkerLock to prevent accessing invalid worker
                               1326                 :                :                  * data, in scenarios where a worker might exit and reset its
                               1327                 :                :                  * state concurrently.
                               1328                 :                :                  */
  245 akapila@postgresql.o     1329         [ +  + ]:GNC         578 :                 if (sub->retaindeadtuples &&
                               1330   [ +  -  +  - ]:             64 :                     sub->retentionactive &&
                               1331                 :                :                     can_update_xmin)
  286                          1332                 :             64 :                     compute_min_nonremovable_xid(w, &xmin);
                               1333                 :                : 
  232                          1334                 :            578 :                 LWLockRelease(LogicalRepWorkerLock);
                               1335                 :                : 
                               1336                 :                :                 /* worker is running already */
  286                          1337                 :            578 :                 continue;
                               1338                 :                :             }
                               1339                 :                : 
  232                          1340                 :            352 :             LWLockRelease(LogicalRepWorkerLock);
                               1341                 :                : 
                               1342                 :                :             /*
                               1343                 :                :              * Can't advance xmin of the slot unless all the workers
                               1344                 :                :              * corresponding to subscriptions actively retaining dead tuples
                               1345                 :                :              * are running, disabling the further computation of the minimum
                               1346                 :                :              * nonremovable xid.
                               1347                 :                :              */
  245                          1348   [ +  +  +  - ]:            352 :             if (sub->retaindeadtuples && sub->retentionactive)
                               1349                 :              2 :                 can_update_xmin = false;
                               1350                 :                : 
                               1351                 :                :             /*
                               1352                 :                :              * If the worker is eligible to start now, launch it.  Otherwise,
                               1353                 :                :              * adjust wait_time so that we'll wake up as soon as it can be
                               1354                 :                :              * started.
                               1355                 :                :              *
                               1356                 :                :              * Each subscription's apply worker can only be restarted once per
                               1357                 :                :              * wal_retrieve_retry_interval, so that errors do not cause us to
                               1358                 :                :              * repeatedly restart the worker as fast as possible.  In cases
                               1359                 :                :              * where a restart is expected (e.g., subscription parameter
                               1360                 :                :              * changes), another process should remove the last-start entry
                               1361                 :                :              * for the subscription so that the worker can be restarted
                               1362                 :                :              * without waiting for wal_retrieve_retry_interval to elapse.
                               1363                 :                :              */
 1199 tgl@sss.pgh.pa.us        1364                 :CBC         352 :             last_start = ApplyLauncherGetWorkerStartTime(sub->oid);
                               1365                 :            352 :             now = GetCurrentTimestamp();
                               1366         [ +  + ]:            352 :             if (last_start == 0 ||
                               1367         [ +  + ]:            222 :                 (elapsed = TimestampDifferenceMilliseconds(last_start, now)) >= wal_retrieve_retry_interval)
                               1368                 :                :             {
                               1369                 :            223 :                 ApplyLauncherSetWorkerStartTime(sub->oid, now);
  315                          1370         [ +  + ]:            222 :                 if (!logicalrep_worker_launch(WORKERTYPE_APPLY,
                               1371                 :            223 :                                               sub->dbid, sub->oid, sub->name,
                               1372                 :                :                                               sub->owner, InvalidOid,
                               1373                 :                :                                               DSM_HANDLE_INVALID,
  245 akapila@postgresql.o     1374         [ +  + ]:GNC         225 :                                               sub->retaindeadtuples &&
                               1375         [ +  - ]:              2 :                                               sub->retentionactive))
                               1376                 :                :                 {
                               1377                 :                :                     /*
                               1378                 :                :                      * We get here either if we failed to launch a worker
                               1379                 :                :                      * (perhaps for resource-exhaustion reasons) or if we
                               1380                 :                :                      * launched one but it immediately quit.  Either way, it
                               1381                 :                :                      * seems appropriate to try again after
                               1382                 :                :                      * wal_retrieve_retry_interval.
                               1383                 :                :                      */
  315 tgl@sss.pgh.pa.us        1384                 :GBC           1 :                     wait_time = Min(wait_time,
                               1385                 :                :                                     wal_retrieve_retry_interval);
                               1386                 :                :                 }
                               1387                 :                :             }
                               1388                 :                :             else
                               1389                 :                :             {
 1199 tgl@sss.pgh.pa.us        1390                 :CBC         129 :                 wait_time = Min(wait_time,
                               1391                 :                :                                 wal_retrieve_retry_interval - elapsed);
                               1392                 :                :             }
                               1393                 :                :         }
                               1394                 :                : 
                               1395                 :                :         /*
                               1396                 :                :          * Drop the CONFLICT_DETECTION_SLOT slot if there is no subscription
                               1397                 :                :          * that requires us to retain dead tuples. Otherwise, if required,
                               1398                 :                :          * advance the slot's xmin to protect dead tuples required for the
                               1399                 :                :          * conflict detection.
                               1400                 :                :          *
                               1401                 :                :          * Additionally, if all apply workers for subscriptions with
                               1402                 :                :          * retain_dead_tuples enabled have requested to stop retention, the
                               1403                 :                :          * slot's xmin will be set to InvalidTransactionId allowing the
                               1404                 :                :          * removal of dead tuples.
                               1405                 :                :          */
  286 akapila@postgresql.o     1406         [ +  + ]:GNC        3210 :         if (MyReplicationSlot)
                               1407                 :                :         {
                               1408         [ +  + ]:             70 :             if (!retain_dead_tuples)
                               1409                 :              1 :                 ReplicationSlotDropAcquired();
  245                          1410         [ +  + ]:             69 :             else if (can_update_xmin)
                               1411                 :             64 :                 update_conflict_slot_xmin(xmin);
                               1412                 :                :         }
                               1413                 :                : 
                               1414                 :                :         /* Switch back to original memory context. */
 1199 tgl@sss.pgh.pa.us        1415                 :CBC        3210 :         MemoryContextSwitchTo(oldctx);
                               1416                 :                :         /* Clean the temporary memory. */
                               1417                 :           3210 :         MemoryContextDelete(subctx);
                               1418                 :                : 
                               1419                 :                :         /* Wait for more work. */
 3255 andres@anarazel.de       1420                 :           3210 :         rc = WaitLatch(MyLatch,
                               1421                 :                :                        WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
                               1422                 :                :                        wait_time,
                               1423                 :                :                        WAIT_EVENT_LOGICAL_LAUNCHER_MAIN);
                               1424                 :                : 
                               1425         [ +  + ]:           3207 :         if (rc & WL_LATCH_SET)
                               1426                 :                :         {
                               1427                 :           3181 :             ResetLatch(MyLatch);
                               1428         [ +  + ]:           3181 :             CHECK_FOR_INTERRUPTS();
                               1429                 :                :         }
                               1430                 :                : 
 2331 rhaas@postgresql.org     1431         [ +  + ]:           2700 :         if (ConfigReloadPending)
                               1432                 :                :         {
                               1433                 :             63 :             ConfigReloadPending = false;
 3312 peter_e@gmx.net          1434                 :             63 :             ProcessConfigFile(PGC_SIGHUP);
                               1435                 :                :         }
                               1436                 :                :     }
                               1437                 :                : 
                               1438                 :                :     /* Not reachable */
                               1439                 :                : }
                               1440                 :                : 
                               1441                 :                : /*
                               1442                 :                :  * Determine the minimum non-removable transaction ID across all apply workers
                               1443                 :                :  * for subscriptions that have retain_dead_tuples enabled. Store the result
                               1444                 :                :  * in *xmin.
                               1445                 :                :  */
                               1446                 :                : static void
  286 akapila@postgresql.o     1447                 :GNC          64 : compute_min_nonremovable_xid(LogicalRepWorker *worker, TransactionId *xmin)
                               1448                 :                : {
                               1449                 :                :     TransactionId nonremovable_xid;
                               1450                 :                : 
                               1451         [ -  + ]:             64 :     Assert(worker != NULL);
                               1452                 :                : 
                               1453                 :                :     /*
                               1454                 :                :      * The replication slot for conflict detection must be created before the
                               1455                 :                :      * worker starts.
                               1456                 :                :      */
                               1457         [ -  + ]:             64 :     Assert(MyReplicationSlot);
                               1458                 :                : 
                               1459                 :             64 :     SpinLockAcquire(&worker->relmutex);
                               1460                 :             64 :     nonremovable_xid = worker->oldest_nonremovable_xid;
                               1461                 :             64 :     SpinLockRelease(&worker->relmutex);
                               1462                 :                : 
                               1463                 :                :     /*
                               1464                 :                :      * Return if the apply worker has stopped retention concurrently.
                               1465                 :                :      *
                               1466                 :                :      * Although this function is invoked only when retentionactive is true,
                               1467                 :                :      * the apply worker might stop retention after the launcher fetches the
                               1468                 :                :      * retentionactive flag.
                               1469                 :                :      */
  245                          1470         [ -  + ]:             64 :     if (!TransactionIdIsValid(nonremovable_xid))
  245 akapila@postgresql.o     1471                 :UNC           0 :         return;
                               1472                 :                : 
  286 akapila@postgresql.o     1473   [ -  +  -  - ]:GNC          64 :     if (!TransactionIdIsValid(*xmin) ||
  286 akapila@postgresql.o     1474                 :UNC           0 :         TransactionIdPrecedes(nonremovable_xid, *xmin))
  286 akapila@postgresql.o     1475                 :GNC          64 :         *xmin = nonremovable_xid;
                               1476                 :                : }
                               1477                 :                : 
                               1478                 :                : /*
                               1479                 :                :  * Acquire the replication slot used to retain information for conflict
                               1480                 :                :  * detection, if it exists.
                               1481                 :                :  *
                               1482                 :                :  * Return true if successfully acquired, otherwise return false.
                               1483                 :                :  */
                               1484                 :                : static bool
                               1485                 :            513 : acquire_conflict_slot_if_exists(void)
                               1486                 :                : {
                               1487         [ +  + ]:            513 :     if (!SearchNamedReplicationSlot(CONFLICT_DETECTION_SLOT, true))
                               1488                 :            512 :         return false;
                               1489                 :                : 
                               1490                 :              1 :     ReplicationSlotAcquire(CONFLICT_DETECTION_SLOT, true, false);
                               1491                 :              1 :     return true;
                               1492                 :                : }
                               1493                 :                : 
                               1494                 :                : /*
                               1495                 :                :  * Update the xmin the replication slot used to retain information required
                               1496                 :                :  * for conflict detection.
                               1497                 :                :  */
                               1498                 :                : static void
  245                          1499                 :             64 : update_conflict_slot_xmin(TransactionId new_xmin)
                               1500                 :                : {
  286                          1501         [ -  + ]:             64 :     Assert(MyReplicationSlot);
  245                          1502   [ +  -  -  + ]:             64 :     Assert(!TransactionIdIsValid(new_xmin) ||
                               1503                 :                :            TransactionIdPrecedesOrEquals(MyReplicationSlot->data.xmin, new_xmin));
                               1504                 :                : 
                               1505                 :                :     /* Return if the xmin value of the slot cannot be updated */
  286                          1506         [ +  + ]:             64 :     if (TransactionIdEquals(MyReplicationSlot->data.xmin, new_xmin))
                               1507                 :             51 :         return;
                               1508                 :                : 
                               1509                 :             13 :     SpinLockAcquire(&MyReplicationSlot->mutex);
                               1510                 :             13 :     MyReplicationSlot->effective_xmin = new_xmin;
                               1511                 :             13 :     MyReplicationSlot->data.xmin = new_xmin;
                               1512                 :             13 :     SpinLockRelease(&MyReplicationSlot->mutex);
                               1513                 :                : 
                               1514         [ -  + ]:             13 :     elog(DEBUG1, "updated xmin: %u", MyReplicationSlot->data.xmin);
                               1515                 :                : 
                               1516                 :             13 :     ReplicationSlotMarkDirty();
                               1517                 :             13 :     ReplicationSlotsComputeRequiredXmin(false);
                               1518                 :                : 
                               1519                 :                :     /*
                               1520                 :                :      * Like PhysicalConfirmReceivedLocation(), do not save slot information
                               1521                 :                :      * each time. This is acceptable because all concurrent transactions on
                               1522                 :                :      * the publisher that require the data preceding the slot's xmin should
                               1523                 :                :      * have already been applied and flushed on the subscriber before the xmin
                               1524                 :                :      * is advanced. So, even if the slot's xmin regresses after a restart, it
                               1525                 :                :      * will be advanced again in the next cycle. Therefore, no data required
                               1526                 :                :      * for conflict detection will be prematurely removed.
                               1527                 :                :      */
                               1528                 :             13 :     return;
                               1529                 :                : }
                               1530                 :                : 
                               1531                 :                : /*
                               1532                 :                :  * Initialize the xmin for the conflict detection slot.
                               1533                 :                :  */
                               1534                 :                : static void
  245                          1535                 :              4 : init_conflict_slot_xmin(void)
                               1536                 :                : {
                               1537                 :                :     TransactionId xmin_horizon;
                               1538                 :                : 
                               1539                 :                :     /* Replication slot must exist but shouldn't be initialized. */
                               1540   [ +  -  -  + ]:              4 :     Assert(MyReplicationSlot &&
                               1541                 :                :            !TransactionIdIsValid(MyReplicationSlot->data.xmin));
                               1542                 :                : 
  126 msawada@postgresql.o     1543                 :              4 :     LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
  286 akapila@postgresql.o     1544                 :              4 :     LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
                               1545                 :                : 
                               1546                 :              4 :     xmin_horizon = GetOldestSafeDecodingTransactionId(false);
                               1547                 :                : 
                               1548                 :              4 :     SpinLockAcquire(&MyReplicationSlot->mutex);
                               1549                 :              4 :     MyReplicationSlot->effective_xmin = xmin_horizon;
                               1550                 :              4 :     MyReplicationSlot->data.xmin = xmin_horizon;
                               1551                 :              4 :     SpinLockRelease(&MyReplicationSlot->mutex);
                               1552                 :                : 
                               1553                 :              4 :     ReplicationSlotsComputeRequiredXmin(true);
                               1554                 :                : 
                               1555                 :              4 :     LWLockRelease(ProcArrayLock);
  126 msawada@postgresql.o     1556                 :              4 :     LWLockRelease(ReplicationSlotControlLock);
                               1557                 :                : 
                               1558                 :                :     /* Write this slot to disk */
  286 akapila@postgresql.o     1559                 :              4 :     ReplicationSlotMarkDirty();
                               1560                 :              4 :     ReplicationSlotSave();
                               1561                 :              4 : }
                               1562                 :                : 
                               1563                 :                : /*
                               1564                 :                :  * Create and acquire the replication slot used to retain information for
                               1565                 :                :  * conflict detection, if not yet.
                               1566                 :                :  */
                               1567                 :                : void
  245                          1568                 :             70 : CreateConflictDetectionSlot(void)
                               1569                 :                : {
                               1570                 :                :     /* Exit early, if the replication slot is already created and acquired */
                               1571         [ +  + ]:             70 :     if (MyReplicationSlot)
                               1572                 :             66 :         return;
                               1573                 :                : 
                               1574         [ +  - ]:              4 :     ereport(LOG,
                               1575                 :                :             errmsg("creating replication conflict detection slot"));
                               1576                 :                : 
                               1577                 :              4 :     ReplicationSlotCreate(CONFLICT_DETECTION_SLOT, false, RS_PERSISTENT, false,
                               1578                 :                :                           false, false, false);
                               1579                 :                : 
                               1580                 :              4 :     init_conflict_slot_xmin();
                               1581                 :                : }
                               1582                 :                : 
                               1583                 :                : /*
                               1584                 :                :  * Is current process the logical replication launcher?
                               1585                 :                :  */
                               1586                 :                : bool
 3253 andres@anarazel.de       1587                 :CBC        2729 : IsLogicalLauncher(void)
                               1588                 :                : {
                               1589                 :           2729 :     return LogicalRepCtx->launcher_pid == MyProcPid;
                               1590                 :                : }
                               1591                 :                : 
                               1592                 :                : /*
                               1593                 :                :  * Return the pid of the leader apply worker if the given pid is the pid of a
                               1594                 :                :  * parallel apply worker, otherwise, return InvalidPid.
                               1595                 :                :  */
                               1596                 :                : pid_t
 1203 akapila@postgresql.o     1597                 :            864 : GetLeaderApplyWorkerPid(pid_t pid)
                               1598                 :                : {
                               1599                 :            864 :     int         leader_pid = InvalidPid;
                               1600                 :                :     int         i;
                               1601                 :                : 
                               1602                 :            864 :     LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
                               1603                 :                : 
                               1604         [ +  + ]:           4320 :     for (i = 0; i < max_logical_replication_workers; i++)
                               1605                 :                :     {
                               1606                 :           3456 :         LogicalRepWorker *w = &LogicalRepCtx->workers[i];
                               1607                 :                : 
                               1608   [ +  +  -  +  :           3456 :         if (isParallelApplyWorker(w) && w->proc && pid == w->proc->pid)
                                        -  -  -  - ]
                               1609                 :                :         {
 1203 akapila@postgresql.o     1610                 :UBC           0 :             leader_pid = w->leader_pid;
                               1611                 :              0 :             break;
                               1612                 :                :         }
                               1613                 :                :     }
                               1614                 :                : 
 1203 akapila@postgresql.o     1615                 :CBC         864 :     LWLockRelease(LogicalRepWorkerLock);
                               1616                 :                : 
                               1617                 :            864 :     return leader_pid;
                               1618                 :                : }
                               1619                 :                : 
                               1620                 :                : /*
                               1621                 :                :  * Returns state of the subscriptions.
                               1622                 :                :  */
                               1623                 :                : Datum
 3393 peter_e@gmx.net          1624                 :              1 : pg_stat_get_subscription(PG_FUNCTION_ARGS)
                               1625                 :                : {
                               1626                 :                : #define PG_STAT_GET_SUBSCRIPTION_COLS   10
                               1627         [ -  + ]:              1 :     Oid         subid = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0);
                               1628                 :                :     int         i;
                               1629                 :              1 :     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
                               1630                 :                : 
 1295 michael@paquier.xyz      1631                 :              1 :     InitMaterializedSRF(fcinfo, 0);
                               1632                 :                : 
                               1633                 :                :     /* Make sure we get consistent view of the workers. */
 3393 peter_e@gmx.net          1634                 :              1 :     LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
                               1635                 :                : 
 1428 tgl@sss.pgh.pa.us        1636         [ +  + ]:              5 :     for (i = 0; i < max_logical_replication_workers; i++)
                               1637                 :                :     {
                               1638                 :                :         /* for each row */
 1389 peter@eisentraut.org     1639                 :              4 :         Datum       values[PG_STAT_GET_SUBSCRIPTION_COLS] = {0};
                               1640                 :              4 :         bool        nulls[PG_STAT_GET_SUBSCRIPTION_COLS] = {0};
                               1641                 :                :         int         worker_pid;
                               1642                 :                :         LogicalRepWorker worker;
                               1643                 :                : 
 3393 peter_e@gmx.net          1644                 :              4 :         memcpy(&worker, &LogicalRepCtx->workers[i],
                               1645                 :                :                sizeof(LogicalRepWorker));
                               1646   [ +  +  -  + ]:              4 :         if (!worker.proc || !IsBackendPid(worker.proc->pid))
                               1647                 :              2 :             continue;
                               1648                 :                : 
                               1649   [ -  +  -  - ]:              2 :         if (OidIsValid(subid) && worker.subid != subid)
 3393 peter_e@gmx.net          1650                 :UBC           0 :             continue;
                               1651                 :                : 
 3393 peter_e@gmx.net          1652                 :CBC           2 :         worker_pid = worker.proc->pid;
                               1653                 :                : 
                               1654                 :              2 :         values[0] = ObjectIdGetDatum(worker.subid);
  181 akapila@postgresql.o     1655   [ +  -  -  + ]:GNC           2 :         if (isTableSyncWorker(&worker))
 3330 peter_e@gmx.net          1656                 :UBC           0 :             values[1] = ObjectIdGetDatum(worker.relid);
                               1657                 :                :         else
 3330 peter_e@gmx.net          1658                 :CBC           2 :             nulls[1] = true;
                               1659                 :              2 :         values[2] = Int32GetDatum(worker_pid);
                               1660                 :                : 
 1203 akapila@postgresql.o     1661   [ +  -  -  + ]:              2 :         if (isParallelApplyWorker(&worker))
 1203 akapila@postgresql.o     1662                 :UBC           0 :             values[3] = Int32GetDatum(worker.leader_pid);
                               1663                 :                :         else
 3330 peter_e@gmx.net          1664                 :CBC           2 :             nulls[3] = true;
                               1665                 :                : 
  180 alvherre@kurilemu.de     1666         [ -  + ]:GNC           2 :         if (!XLogRecPtrIsValid(worker.last_lsn))
 1203 akapila@postgresql.o     1667                 :UBC           0 :             nulls[4] = true;
                               1668                 :                :         else
 1203 akapila@postgresql.o     1669                 :CBC           2 :             values[4] = LSNGetDatum(worker.last_lsn);
 3393 peter_e@gmx.net          1670         [ -  + ]:              2 :         if (worker.last_send_time == 0)
 1203 akapila@postgresql.o     1671                 :UBC           0 :             nulls[5] = true;
                               1672                 :                :         else
 1203 akapila@postgresql.o     1673                 :CBC           2 :             values[5] = TimestampTzGetDatum(worker.last_send_time);
 3393 peter_e@gmx.net          1674         [ -  + ]:              2 :         if (worker.last_recv_time == 0)
 1203 akapila@postgresql.o     1675                 :UBC           0 :             nulls[6] = true;
                               1676                 :                :         else
 1203 akapila@postgresql.o     1677                 :CBC           2 :             values[6] = TimestampTzGetDatum(worker.last_recv_time);
  180 alvherre@kurilemu.de     1678         [ -  + ]:GNC           2 :         if (!XLogRecPtrIsValid(worker.reply_lsn))
 1203 akapila@postgresql.o     1679                 :UBC           0 :             nulls[7] = true;
                               1680                 :                :         else
 1203 akapila@postgresql.o     1681                 :CBC           2 :             values[7] = LSNGetDatum(worker.reply_lsn);
 3393 peter_e@gmx.net          1682         [ -  + ]:              2 :         if (worker.reply_time == 0)
 1203 akapila@postgresql.o     1683                 :UBC           0 :             nulls[8] = true;
                               1684                 :                :         else
 1203 akapila@postgresql.o     1685                 :CBC           2 :             values[8] = TimestampTzGetDatum(worker.reply_time);
                               1686                 :                : 
  953 nathan@postgresql.or     1687   [ +  -  -  -  :              2 :         switch (worker.type)
                                              -  - ]
                               1688                 :                :         {
                               1689                 :              2 :             case WORKERTYPE_APPLY:
                               1690                 :              2 :                 values[9] = CStringGetTextDatum("apply");
                               1691                 :              2 :                 break;
  953 nathan@postgresql.or     1692                 :UBC           0 :             case WORKERTYPE_PARALLEL_APPLY:
                               1693                 :              0 :                 values[9] = CStringGetTextDatum("parallel apply");
                               1694                 :              0 :                 break;
  181 akapila@postgresql.o     1695                 :UNC           0 :             case WORKERTYPE_SEQUENCESYNC:
                               1696                 :              0 :                 values[9] = CStringGetTextDatum("sequence synchronization");
                               1697                 :              0 :                 break;
  953 nathan@postgresql.or     1698                 :UBC           0 :             case WORKERTYPE_TABLESYNC:
                               1699                 :              0 :                 values[9] = CStringGetTextDatum("table synchronization");
                               1700                 :              0 :                 break;
                               1701                 :              0 :             case WORKERTYPE_UNKNOWN:
                               1702                 :                :                 /* Should never happen. */
                               1703         [ #  # ]:              0 :                 elog(ERROR, "unknown worker type");
                               1704                 :                :         }
                               1705                 :                : 
 1520 michael@paquier.xyz      1706                 :CBC           2 :         tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
                               1707                 :                :                              values, nulls);
                               1708                 :                : 
                               1709                 :                :         /*
                               1710                 :                :          * If only a single subscription was requested, and we found it,
                               1711                 :                :          * break.
                               1712                 :                :          */
 3393 peter_e@gmx.net          1713         [ -  + ]:              2 :         if (OidIsValid(subid))
 3393 peter_e@gmx.net          1714                 :UBC           0 :             break;
                               1715                 :                :     }
                               1716                 :                : 
 3393 peter_e@gmx.net          1717                 :CBC           1 :     LWLockRelease(LogicalRepWorkerLock);
                               1718                 :                : 
                               1719                 :              1 :     return (Datum) 0;
                               1720                 :                : }
        

Generated by: LCOV version 2.5.0-beta