LCOV - differential code coverage report
Current view: top level - src/backend/replication/logical - launcher.c (source / functions) Coverage Total Hit UNC LBC UBC GBC GNC CBC DCB
Current: c70b6db34ffeab48beef1fb4ce61bcad3772b8dd vs 06473f5a344df8c9594ead90a609b86f6724cff8 Lines: 88.7 % 566 502 5 1 58 74 428 1
Current Date: 2025-09-06 07:49:51 +0900 Functions: 100.0 % 36 36 9 27
Baseline: lcov-20250906-005545-baseline Branches: 65.4 % 384 251 24 2 107 3 44 204
Baseline Date: 2025-09-05 08:21:35 +0100 Line coverage date bins:
Legend: Lines:     hit not hit
Branches: + taken - not taken # not executed
(1,7] days: 92.3 % 26 24 2 24
(30,360] days: 92.5 % 67 62 3 1 1 50 12
(360..) days: 87.9 % 473 416 57 416
Function coverage date bins:
(1,7] days: 100.0 % 3 3 3
(30,360] days: 100.0 % 2 2 2
(360..) days: 100.0 % 31 31 4 27
Branch coverage date bins:
(1,7] days: 64.7 % 34 22 12 22
(30,360] days: 62.5 % 40 25 12 1 2 22 3
(360..) days: 65.8 % 310 204 1 105 3 201

 Age         Owner                    Branch data    TLA  Line data    Source code
                                  1                 :                : /*-------------------------------------------------------------------------
                                  2                 :                :  * launcher.c
                                  3                 :                :  *     PostgreSQL logical replication worker launcher process
                                  4                 :                :  *
                                  5                 :                :  * Copyright (c) 2016-2025, PostgreSQL Global Development Group
                                  6                 :                :  *
                                  7                 :                :  * IDENTIFICATION
                                  8                 :                :  *    src/backend/replication/logical/launcher.c
                                  9                 :                :  *
                                 10                 :                :  * NOTES
                                 11                 :                :  *    This module contains the logical replication worker launcher which
                                 12                 :                :  *    uses the background worker infrastructure to start the logical
                                 13                 :                :  *    replication workers for every enabled subscription.
                                 14                 :                :  *
                                 15                 :                :  *-------------------------------------------------------------------------
                                 16                 :                :  */
                                 17                 :                : 
                                 18                 :                : #include "postgres.h"
                                 19                 :                : 
                                 20                 :                : #include "access/heapam.h"
                                 21                 :                : #include "access/htup.h"
                                 22                 :                : #include "access/htup_details.h"
                                 23                 :                : #include "access/tableam.h"
                                 24                 :                : #include "access/xact.h"
                                 25                 :                : #include "catalog/pg_subscription.h"
                                 26                 :                : #include "catalog/pg_subscription_rel.h"
                                 27                 :                : #include "funcapi.h"
                                 28                 :                : #include "lib/dshash.h"
                                 29                 :                : #include "miscadmin.h"
                                 30                 :                : #include "pgstat.h"
                                 31                 :                : #include "postmaster/bgworker.h"
                                 32                 :                : #include "postmaster/interrupt.h"
                                 33                 :                : #include "replication/logicallauncher.h"
                                 34                 :                : #include "replication/origin.h"
                                 35                 :                : #include "replication/slot.h"
                                 36                 :                : #include "replication/walreceiver.h"
                                 37                 :                : #include "replication/worker_internal.h"
                                 38                 :                : #include "storage/ipc.h"
                                 39                 :                : #include "storage/proc.h"
                                 40                 :                : #include "storage/procarray.h"
                                 41                 :                : #include "tcop/tcopprot.h"
                                 42                 :                : #include "utils/builtins.h"
                                 43                 :                : #include "utils/memutils.h"
                                 44                 :                : #include "utils/pg_lsn.h"
                                 45                 :                : #include "utils/snapmgr.h"
                                 46                 :                : #include "utils/syscache.h"
                                 47                 :                : 
                                 48                 :                : /* max sleep time between cycles (3min) */
                                 49                 :                : #define DEFAULT_NAPTIME_PER_CYCLE 180000L
                                 50                 :                : 
                                 51                 :                : /* GUC variables */
                                 52                 :                : int         max_logical_replication_workers = 4;
                                 53                 :                : int         max_sync_workers_per_subscription = 2;
                                 54                 :                : int         max_parallel_apply_workers_per_subscription = 2;
                                 55                 :                : 
                                 56                 :                : LogicalRepWorker *MyLogicalRepWorker = NULL;
                                 57                 :                : 
                                 58                 :                : typedef struct LogicalRepCtxStruct
                                 59                 :                : {
                                 60                 :                :     /* Supervisor process. */
                                 61                 :                :     pid_t       launcher_pid;
                                 62                 :                : 
                                 63                 :                :     /* Hash table holding last start times of subscriptions' apply workers. */
                                 64                 :                :     dsa_handle  last_start_dsa;
                                 65                 :                :     dshash_table_handle last_start_dsh;
                                 66                 :                : 
                                 67                 :                :     /* Background workers. */
                                 68                 :                :     LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER];
                                 69                 :                : } LogicalRepCtxStruct;
                                 70                 :                : 
                                 71                 :                : static LogicalRepCtxStruct *LogicalRepCtx;
                                 72                 :                : 
                                 73                 :                : /* an entry in the last-start-times shared hash table */
                                 74                 :                : typedef struct LauncherLastStartTimesEntry
                                 75                 :                : {
                                 76                 :                :     Oid         subid;          /* OID of logrep subscription (hash key) */
                                 77                 :                :     TimestampTz last_start_time;    /* last time its apply worker was started */
                                 78                 :                : } LauncherLastStartTimesEntry;
                                 79                 :                : 
                                 80                 :                : /* parameters for the last-start-times shared hash table */
                                 81                 :                : static const dshash_parameters dsh_params = {
                                 82                 :                :     sizeof(Oid),
                                 83                 :                :     sizeof(LauncherLastStartTimesEntry),
                                 84                 :                :     dshash_memcmp,
                                 85                 :                :     dshash_memhash,
                                 86                 :                :     dshash_memcpy,
                                 87                 :                :     LWTRANCHE_LAUNCHER_HASH
                                 88                 :                : };
                                 89                 :                : 
                                 90                 :                : static dsa_area *last_start_times_dsa = NULL;
                                 91                 :                : static dshash_table *last_start_times = NULL;
                                 92                 :                : 
                                 93                 :                : static bool on_commit_launcher_wakeup = false;
                                 94                 :                : 
                                 95                 :                : 
                                 96                 :                : static void logicalrep_launcher_onexit(int code, Datum arg);
                                 97                 :                : static void logicalrep_worker_onexit(int code, Datum arg);
                                 98                 :                : static void logicalrep_worker_detach(void);
                                 99                 :                : static void logicalrep_worker_cleanup(LogicalRepWorker *worker);
                                100                 :                : static int  logicalrep_pa_worker_count(Oid subid);
                                101                 :                : static void logicalrep_launcher_attach_dshmem(void);
                                102                 :                : static void ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time);
                                103                 :                : static TimestampTz ApplyLauncherGetWorkerStartTime(Oid subid);
                                104                 :                : static void compute_min_nonremovable_xid(LogicalRepWorker *worker, TransactionId *xmin);
                                105                 :                : static bool acquire_conflict_slot_if_exists(void);
                                106                 :                : static void update_conflict_slot_xmin(TransactionId new_xmin);
                                107                 :                : static void init_conflict_slot_xmin(void);
                                108                 :                : 
                                109                 :                : 
                                110                 :                : /*
                                111                 :                :  * Load the list of subscriptions.
                                112                 :                :  *
                                113                 :                :  * Only the fields interesting for worker start/stop functions are filled for
                                114                 :                :  * each subscription.
                                115                 :                :  */
                                116                 :                : static List *
 3152 peter_e@gmx.net           117                 :CBC        2669 : get_subscription_list(void)
                                118                 :                : {
                                119                 :           2669 :     List       *res = NIL;
                                120                 :                :     Relation    rel;
                                121                 :                :     TableScanDesc scan;
                                122                 :                :     HeapTuple   tup;
                                123                 :                :     MemoryContext resultcxt;
                                124                 :                : 
                                125                 :                :     /* This is the context that we will allocate our output data in */
                                126                 :           2669 :     resultcxt = CurrentMemoryContext;
                                127                 :                : 
                                128                 :                :     /*
                                129                 :                :      * Start a transaction so we can access pg_subscription.
                                130                 :                :      */
                                131                 :           2669 :     StartTransactionCommand();
                                132                 :                : 
 2420 andres@anarazel.de        133                 :           2669 :     rel = table_open(SubscriptionRelationId, AccessShareLock);
 2371                           134                 :           2669 :     scan = table_beginscan_catalog(rel, 0, NULL);
                                135                 :                : 
 3152 peter_e@gmx.net           136         [ +  + ]:           3491 :     while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
                                137                 :                :     {
                                138                 :            822 :         Form_pg_subscription subform = (Form_pg_subscription) GETSTRUCT(tup);
                                139                 :                :         Subscription *sub;
                                140                 :                :         MemoryContext oldcxt;
                                141                 :                : 
                                142                 :                :         /*
                                143                 :                :          * Allocate our results in the caller's context, not the
                                144                 :                :          * transaction's. We do this inside the loop, and restore the original
                                145                 :                :          * context at the end, so that leaky things like heap_getnext() are
                                146                 :                :          * not called in a potentially long-lived context.
                                147                 :                :          */
                                148                 :            822 :         oldcxt = MemoryContextSwitchTo(resultcxt);
                                149                 :                : 
 3067                           150                 :            822 :         sub = (Subscription *) palloc0(sizeof(Subscription));
 2482 andres@anarazel.de        151                 :            822 :         sub->oid = subform->oid;
 3152 peter_e@gmx.net           152                 :            822 :         sub->dbid = subform->subdbid;
                                153                 :            822 :         sub->owner = subform->subowner;
                                154                 :            822 :         sub->enabled = subform->subenabled;
                                155                 :            822 :         sub->name = pstrdup(NameStr(subform->subname));
   45 akapila@postgresql.o      156                 :GNC         822 :         sub->retaindeadtuples = subform->subretaindeadtuples;
    4                           157                 :            822 :         sub->retentionactive = subform->subretentionactive;
                                158                 :                :         /* We don't fill fields we are not interested in. */
                                159                 :                : 
 3152 peter_e@gmx.net           160                 :CBC         822 :         res = lappend(res, sub);
                                161                 :            822 :         MemoryContextSwitchTo(oldcxt);
                                162                 :                :     }
                                163                 :                : 
 2371 andres@anarazel.de        164                 :           2669 :     table_endscan(scan);
 2420                           165                 :           2669 :     table_close(rel, AccessShareLock);
                                166                 :                : 
 3152 peter_e@gmx.net           167                 :           2669 :     CommitTransactionCommand();
                                168                 :                : 
                                169                 :           2669 :     return res;
                                170                 :                : }
                                171                 :                : 
                                172                 :                : /*
                                173                 :                :  * Wait for a background worker to start up and attach to the shmem context.
                                174                 :                :  *
                                175                 :                :  * This is only needed for cleaning up the shared memory in case the worker
                                176                 :                :  * fails to attach.
                                177                 :                :  *
                                178                 :                :  * Returns whether the attach was successful.
                                179                 :                :  */
                                180                 :                : static bool
                                181                 :            410 : WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
                                182                 :                :                                uint16 generation,
                                183                 :                :                                BackgroundWorkerHandle *handle)
                                184                 :                : {
   74 tgl@sss.pgh.pa.us         185                 :            410 :     bool        result = false;
                                186                 :            410 :     bool        dropped_latch = false;
                                187                 :                : 
                                188                 :                :     for (;;)
 3152 peter_e@gmx.net           189                 :           1113 :     {
                                190                 :                :         BgwHandleStatus status;
                                191                 :                :         pid_t       pid;
                                192                 :                :         int         rc;
                                193                 :                : 
                                194         [ +  + ]:           1523 :         CHECK_FOR_INTERRUPTS();
                                195                 :                : 
 3055                           196                 :           1522 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
                                197                 :                : 
                                198                 :                :         /* Worker either died or has started. Return false if died. */
                                199   [ +  -  +  + ]:           1522 :         if (!worker->in_use || worker->proc)
                                200                 :                :         {
   74 tgl@sss.pgh.pa.us         201                 :            405 :             result = worker->in_use;
 3055 peter_e@gmx.net           202                 :            405 :             LWLockRelease(LogicalRepWorkerLock);
   74 tgl@sss.pgh.pa.us         203                 :            405 :             break;
                                204                 :                :         }
                                205                 :                : 
 3055 peter_e@gmx.net           206                 :           1117 :         LWLockRelease(LogicalRepWorkerLock);
                                207                 :                : 
                                208                 :                :         /* Check if worker has died before attaching, and clean up after it. */
 3152                           209                 :           1117 :         status = GetBackgroundWorkerPid(handle, &pid);
                                210                 :                : 
                                211         [ -  + ]:           1117 :         if (status == BGWH_STOPPED)
                                212                 :                :         {
 3055 peter_e@gmx.net           213                 :UBC           0 :             LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
                                214                 :                :             /* Ensure that this was indeed the worker we waited for. */
                                215         [ #  # ]:              0 :             if (generation == worker->generation)
                                216                 :              0 :                 logicalrep_worker_cleanup(worker);
                                217                 :              0 :             LWLockRelease(LogicalRepWorkerLock);
   74 tgl@sss.pgh.pa.us         218                 :              0 :             break;              /* result is already false */
                                219                 :                :         }
                                220                 :                : 
                                221                 :                :         /*
                                222                 :                :          * We need timeout because we generally don't get notified via latch
                                223                 :                :          * about the worker attach.  But we don't expect to have to wait long.
                                224                 :                :          */
 3152 peter_e@gmx.net           225                 :CBC        1117 :         rc = WaitLatch(MyLatch,
                                226                 :                :                        WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
                                227                 :                :                        10L, WAIT_EVENT_BGWORKER_STARTUP);
                                228                 :                : 
 3014 andres@anarazel.de        229         [ +  + ]:           1117 :         if (rc & WL_LATCH_SET)
                                230                 :                :         {
                                231                 :            467 :             ResetLatch(MyLatch);
                                232         [ +  + ]:            467 :             CHECK_FOR_INTERRUPTS();
   74 tgl@sss.pgh.pa.us         233                 :            463 :             dropped_latch = true;
                                234                 :                :         }
                                235                 :                :     }
                                236                 :                : 
                                237                 :                :     /*
                                238                 :                :      * If we had to clear a latch event in order to wait, be sure to restore
                                239                 :                :      * it before exiting.  Otherwise caller may miss events.
                                240                 :                :      */
                                241         [ +  - ]:            405 :     if (dropped_latch)
                                242                 :            405 :         SetLatch(MyLatch);
                                243                 :                : 
                                244                 :            405 :     return result;
                                245                 :                : }
                                246                 :                : 
                                247                 :                : /*
                                248                 :                :  * Walks the workers array and searches for one that matches given
                                249                 :                :  * subscription id and relid.
                                250                 :                :  *
                                251                 :                :  * We are only interested in the leader apply worker or table sync worker.
                                252                 :                :  */
                                253                 :                : LogicalRepWorker *
 3089 peter_e@gmx.net           254                 :           3120 : logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
                                255                 :                : {
                                256                 :                :     int         i;
 3034 bruce@momjian.us          257                 :           3120 :     LogicalRepWorker *res = NULL;
                                258                 :                : 
 3152 peter_e@gmx.net           259         [ -  + ]:           3120 :     Assert(LWLockHeldByMe(LogicalRepWorkerLock));
                                260                 :                : 
                                261                 :                :     /* Search for attached worker for a given subscription id. */
                                262         [ +  + ]:           9624 :     for (i = 0; i < max_logical_replication_workers; i++)
                                263                 :                :     {
 3034 bruce@momjian.us          264                 :           8402 :         LogicalRepWorker *w = &LogicalRepCtx->workers[i];
                                265                 :                : 
                                266                 :                :         /* Skip parallel apply workers. */
  971 akapila@postgresql.o      267   [ +  +  -  + ]:           8402 :         if (isParallelApplyWorker(w))
  971 akapila@postgresql.o      268                 :UBC           0 :             continue;
                                269                 :                : 
 3055 peter_e@gmx.net           270   [ +  +  +  +  :CBC        8402 :         if (w->in_use && w->subid == subid && w->relid == relid &&
                                              +  + ]
                                271   [ +  +  +  - ]:           1898 :             (!only_running || w->proc))
                                272                 :                :         {
 3152                           273                 :           1898 :             res = w;
                                274                 :           1898 :             break;
                                275                 :                :         }
                                276                 :                :     }
                                277                 :                : 
                                278                 :           3120 :     return res;
                                279                 :                : }
                                280                 :                : 
                                281                 :                : /*
                                282                 :                :  * Similar to logicalrep_worker_find(), but returns a list of all workers for
                                283                 :                :  * the subscription, instead of just one.
                                284                 :                :  */
                                285                 :                : List *
  409 akapila@postgresql.o      286                 :            647 : logicalrep_workers_find(Oid subid, bool only_running, bool acquire_lock)
                                287                 :                : {
                                288                 :                :     int         i;
 2955 peter_e@gmx.net           289                 :            647 :     List       *res = NIL;
                                290                 :                : 
  409 akapila@postgresql.o      291         [ +  + ]:            647 :     if (acquire_lock)
                                292                 :            117 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
                                293                 :                : 
 2955 peter_e@gmx.net           294         [ -  + ]:            647 :     Assert(LWLockHeldByMe(LogicalRepWorkerLock));
                                295                 :                : 
                                296                 :                :     /* Search for attached worker for a given subscription id. */
                                297         [ +  + ]:           3363 :     for (i = 0; i < max_logical_replication_workers; i++)
                                298                 :                :     {
                                299                 :           2716 :         LogicalRepWorker *w = &LogicalRepCtx->workers[i];
                                300                 :                : 
                                301   [ +  +  +  +  :           2716 :         if (w->in_use && w->subid == subid && (!only_running || w->proc))
                                        +  +  +  + ]
                                302                 :            462 :             res = lappend(res, w);
                                303                 :                :     }
                                304                 :                : 
  409 akapila@postgresql.o      305         [ +  + ]:            647 :     if (acquire_lock)
                                306                 :            117 :         LWLockRelease(LogicalRepWorkerLock);
                                307                 :                : 
 2955 peter_e@gmx.net           308                 :            647 :     return res;
                                309                 :                : }
                                310                 :                : 
                                311                 :                : /*
                                312                 :                :  * Start new logical replication background worker, if possible.
                                313                 :                :  *
                                314                 :                :  * Returns true on success, false on failure.
                                315                 :                :  */
                                316                 :                : bool
  754 akapila@postgresql.o      317                 :            410 : logicalrep_worker_launch(LogicalRepWorkerType wtype,
                                318                 :                :                          Oid dbid, Oid subid, const char *subname, Oid userid,
                                319                 :                :                          Oid relid, dsm_handle subworker_dsm,
                                320                 :                :                          bool retain_dead_tuples)
                                321                 :                : {
                                322                 :                :     BackgroundWorker bgw;
                                323                 :                :     BackgroundWorkerHandle *bgw_handle;
                                324                 :                :     uint16      generation;
                                325                 :                :     int         i;
 3034 bruce@momjian.us          326                 :            410 :     int         slot = 0;
                                327                 :            410 :     LogicalRepWorker *worker = NULL;
                                328                 :                :     int         nsyncworkers;
                                329                 :                :     int         nparallelapplyworkers;
                                330                 :                :     TimestampTz now;
  754 akapila@postgresql.o      331                 :            410 :     bool        is_tablesync_worker = (wtype == WORKERTYPE_TABLESYNC);
                                332                 :            410 :     bool        is_parallel_apply_worker = (wtype == WORKERTYPE_PARALLEL_APPLY);
                                333                 :                : 
                                334                 :                :     /*----------
                                335                 :                :      * Sanity checks:
                                336                 :                :      * - must be valid worker type
                                337                 :                :      * - tablesync workers are only ones to have relid
                                338                 :                :      * - parallel apply worker is the only kind of subworker
                                339                 :                :      * - The replication slot used in conflict detection is created when
                                340                 :                :      *   retain_dead_tuples is enabled
                                341                 :                :      */
                                342         [ -  + ]:            410 :     Assert(wtype != WORKERTYPE_UNKNOWN);
                                343         [ -  + ]:            410 :     Assert(is_tablesync_worker == OidIsValid(relid));
                                344         [ -  + ]:            410 :     Assert(is_parallel_apply_worker == (subworker_dsm != DSM_HANDLE_INVALID));
   45 akapila@postgresql.o      345   [ +  +  -  + ]:GNC         410 :     Assert(!retain_dead_tuples || MyReplicationSlot);
                                346                 :                : 
 3027 peter_e@gmx.net           347         [ +  + ]:CBC         410 :     ereport(DEBUG1,
                                348                 :                :             (errmsg_internal("starting logical replication worker for subscription \"%s\"",
                                349                 :                :                              subname)));
                                350                 :                : 
                                351                 :                :     /* Report this after the initial starting message for consistency. */
  169 msawada@postgresql.o      352         [ -  + ]:            410 :     if (max_active_replication_origins == 0)
 3152 peter_e@gmx.net           353         [ #  # ]:UBC           0 :         ereport(ERROR,
                                354                 :                :                 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
                                355                 :                :                  errmsg("cannot start logical replication workers when \"max_active_replication_origins\" is 0")));
                                356                 :                : 
                                357                 :                :     /*
                                358                 :                :      * We need to do the modification of the shared memory under lock so that
                                359                 :                :      * we have consistent view.
                                360                 :                :      */
 3152 peter_e@gmx.net           361                 :CBC         410 :     LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
                                362                 :                : 
 3055                           363                 :            410 : retry:
                                364                 :                :     /* Find unused worker slot. */
                                365         [ +  - ]:            707 :     for (i = 0; i < max_logical_replication_workers; i++)
                                366                 :                :     {
                                367                 :            707 :         LogicalRepWorker *w = &LogicalRepCtx->workers[i];
                                368                 :                : 
                                369         [ +  + ]:            707 :         if (!w->in_use)
                                370                 :                :         {
                                371                 :            410 :             worker = w;
                                372                 :            410 :             slot = i;
 3152                           373                 :            410 :             break;
                                374                 :                :         }
                                375                 :                :     }
                                376                 :                : 
 3055                           377                 :            410 :     nsyncworkers = logicalrep_sync_worker_count(subid);
                                378                 :                : 
                                379                 :            410 :     now = GetCurrentTimestamp();
                                380                 :                : 
                                381                 :                :     /*
                                382                 :                :      * If we didn't find a free slot, try to do garbage collection.  The
                                383                 :                :      * reason we do this is because if some worker failed to start up and its
                                384                 :                :      * parent has crashed while waiting, the in_use state was never cleared.
                                385                 :                :      */
                                386   [ +  -  -  + ]:            410 :     if (worker == NULL || nsyncworkers >= max_sync_workers_per_subscription)
                                387                 :                :     {
 3034 bruce@momjian.us          388                 :UBC           0 :         bool        did_cleanup = false;
                                389                 :                : 
 3055 peter_e@gmx.net           390         [ #  # ]:              0 :         for (i = 0; i < max_logical_replication_workers; i++)
                                391                 :                :         {
                                392                 :              0 :             LogicalRepWorker *w = &LogicalRepCtx->workers[i];
                                393                 :                : 
                                394                 :                :             /*
                                395                 :                :              * If the worker was marked in use but didn't manage to attach in
                                396                 :                :              * time, clean it up.
                                397                 :                :              */
                                398   [ #  #  #  #  :              0 :             if (w->in_use && !w->proc &&
                                              #  # ]
                                399                 :              0 :                 TimestampDifferenceExceeds(w->launch_time, now,
                                400                 :                :                                            wal_receiver_timeout))
                                401                 :                :             {
                                402         [ #  # ]:              0 :                 elog(WARNING,
                                403                 :                :                      "logical replication worker for subscription %u took too long to start; canceled",
                                404                 :                :                      w->subid);
                                405                 :                : 
                                406                 :              0 :                 logicalrep_worker_cleanup(w);
                                407                 :              0 :                 did_cleanup = true;
                                408                 :                :             }
                                409                 :                :         }
                                410                 :                : 
                                411         [ #  # ]:              0 :         if (did_cleanup)
                                412                 :              0 :             goto retry;
                                413                 :                :     }
                                414                 :                : 
                                415                 :                :     /*
                                416                 :                :      * We don't allow to invoke more sync workers once we have reached the
                                417                 :                :      * sync worker limit per subscription. So, just return silently as we
                                418                 :                :      * might get here because of an otherwise harmless race condition.
                                419                 :                :      */
  754 akapila@postgresql.o      420   [ +  +  -  + ]:CBC         410 :     if (is_tablesync_worker && nsyncworkers >= max_sync_workers_per_subscription)
                                421                 :                :     {
 3055 peter_e@gmx.net           422                 :UBC           0 :         LWLockRelease(LogicalRepWorkerLock);
  971 akapila@postgresql.o      423                 :              0 :         return false;
                                424                 :                :     }
                                425                 :                : 
  971 akapila@postgresql.o      426                 :CBC         410 :     nparallelapplyworkers = logicalrep_pa_worker_count(subid);
                                427                 :                : 
                                428                 :                :     /*
                                429                 :                :      * Return false if the number of parallel apply workers reached the limit
                                430                 :                :      * per subscription.
                                431                 :                :      */
                                432         [ +  + ]:            410 :     if (is_parallel_apply_worker &&
                                433         [ -  + ]:             10 :         nparallelapplyworkers >= max_parallel_apply_workers_per_subscription)
                                434                 :                :     {
  971 akapila@postgresql.o      435                 :UBC           0 :         LWLockRelease(LogicalRepWorkerLock);
                                436                 :              0 :         return false;
                                437                 :                :     }
                                438                 :                : 
                                439                 :                :     /*
                                440                 :                :      * However if there are no more free worker slots, inform user about it
                                441                 :                :      * before exiting.
                                442                 :                :      */
 3152 peter_e@gmx.net           443         [ -  + ]:CBC         410 :     if (worker == NULL)
                                444                 :                :     {
 3147 fujii@postgresql.org      445                 :UBC           0 :         LWLockRelease(LogicalRepWorkerLock);
 3152 peter_e@gmx.net           446         [ #  # ]:              0 :         ereport(WARNING,
                                447                 :                :                 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
                                448                 :                :                  errmsg("out of logical replication worker slots"),
                                449                 :                :                  errhint("You might need to increase \"%s\".", "max_logical_replication_workers")));
  971 akapila@postgresql.o      450                 :              0 :         return false;
                                451                 :                :     }
                                452                 :                : 
                                453                 :                :     /* Prepare the worker slot. */
  754 akapila@postgresql.o      454                 :CBC         410 :     worker->type = wtype;
 3055 peter_e@gmx.net           455                 :            410 :     worker->launch_time = now;
                                456                 :            410 :     worker->in_use = true;
                                457                 :            410 :     worker->generation++;
 3089                           458                 :            410 :     worker->proc = NULL;
 3152                           459                 :            410 :     worker->dbid = dbid;
                                460                 :            410 :     worker->userid = userid;
                                461                 :            410 :     worker->subid = subid;
 3089                           462                 :            410 :     worker->relid = relid;
                                463                 :            410 :     worker->relstate = SUBREL_STATE_UNKNOWN;
                                464                 :            410 :     worker->relstate_lsn = InvalidXLogRecPtr;
 1465 akapila@postgresql.o      465                 :            410 :     worker->stream_fileset = NULL;
  962                           466         [ +  + ]:            410 :     worker->leader_pid = is_parallel_apply_worker ? MyProcPid : InvalidPid;
  971                           467                 :            410 :     worker->parallel_apply = is_parallel_apply_worker;
   45 akapila@postgresql.o      468                 :GNC         410 :     worker->oldest_nonremovable_xid = retain_dead_tuples
                                469                 :              1 :         ? MyReplicationSlot->data.xmin
                                470         [ +  + ]:            410 :         : InvalidTransactionId;
 3089 peter_e@gmx.net           471                 :CBC         410 :     worker->last_lsn = InvalidXLogRecPtr;
                                472                 :            410 :     TIMESTAMP_NOBEGIN(worker->last_send_time);
                                473                 :            410 :     TIMESTAMP_NOBEGIN(worker->last_recv_time);
                                474                 :            410 :     worker->reply_lsn = InvalidXLogRecPtr;
                                475                 :            410 :     TIMESTAMP_NOBEGIN(worker->reply_time);
                                476                 :                : 
                                477                 :                :     /* Before releasing lock, remember generation for future identification. */
 2910 tgl@sss.pgh.pa.us         478                 :            410 :     generation = worker->generation;
                                479                 :                : 
 3152 peter_e@gmx.net           480                 :            410 :     LWLockRelease(LogicalRepWorkerLock);
                                481                 :                : 
                                482                 :                :     /* Register the new dynamic worker. */
 3065 tgl@sss.pgh.pa.us         483                 :            410 :     memset(&bgw, 0, sizeof(bgw));
 3034 bruce@momjian.us          484                 :            410 :     bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
                                485                 :                :         BGWORKER_BACKEND_DATABASE_CONNECTION;
 3152 peter_e@gmx.net           486                 :            410 :     bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
  796 nathan@postgresql.or      487                 :            410 :     snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres");
                                488                 :                : 
  746 akapila@postgresql.o      489   [ +  +  +  -  :            410 :     switch (worker->type)
                                                 - ]
                                490                 :                :     {
                                491                 :            204 :         case WORKERTYPE_APPLY:
                                492                 :            204 :             snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain");
                                493                 :            204 :             snprintf(bgw.bgw_name, BGW_MAXLEN,
                                494                 :                :                      "logical replication apply worker for subscription %u",
                                495                 :                :                      subid);
                                496                 :            204 :             snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication apply worker");
                                497                 :            204 :             break;
                                498                 :                : 
                                499                 :             10 :         case WORKERTYPE_PARALLEL_APPLY:
                                500                 :             10 :             snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ParallelApplyWorkerMain");
                                501                 :             10 :             snprintf(bgw.bgw_name, BGW_MAXLEN,
                                502                 :                :                      "logical replication parallel apply worker for subscription %u",
                                503                 :                :                      subid);
                                504                 :             10 :             snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication parallel worker");
                                505                 :                : 
                                506                 :             10 :             memcpy(bgw.bgw_extra, &subworker_dsm, sizeof(dsm_handle));
                                507                 :             10 :             break;
                                508                 :                : 
                                509                 :            196 :         case WORKERTYPE_TABLESYNC:
                                510                 :            196 :             snprintf(bgw.bgw_function_name, BGW_MAXLEN, "TablesyncWorkerMain");
                                511                 :            196 :             snprintf(bgw.bgw_name, BGW_MAXLEN,
                                512                 :                :                      "logical replication tablesync worker for subscription %u sync %u",
                                513                 :                :                      subid,
                                514                 :                :                      relid);
                                515                 :            196 :             snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication tablesync worker");
                                516                 :            196 :             break;
                                517                 :                : 
  746 akapila@postgresql.o      518                 :UBC           0 :         case WORKERTYPE_UNKNOWN:
                                519                 :                :             /* Should never happen. */
                                520         [ #  # ]:              0 :             elog(ERROR, "unknown worker type");
                                521                 :                :     }
                                522                 :                : 
 3152 peter_e@gmx.net           523                 :CBC         410 :     bgw.bgw_restart_time = BGW_NEVER_RESTART;
                                524                 :            410 :     bgw.bgw_notify_pid = MyProcPid;
 3062 fujii@postgresql.org      525                 :            410 :     bgw.bgw_main_arg = Int32GetDatum(slot);
                                526                 :                : 
 3152 peter_e@gmx.net           527         [ -  + ]:            410 :     if (!RegisterDynamicBackgroundWorker(&bgw, &bgw_handle))
                                528                 :                :     {
                                529                 :                :         /* Failed to start worker, so clean up the worker slot. */
 2910 tgl@sss.pgh.pa.us         530                 :UBC           0 :         LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
                                531         [ #  # ]:              0 :         Assert(generation == worker->generation);
                                532                 :              0 :         logicalrep_worker_cleanup(worker);
                                533                 :              0 :         LWLockRelease(LogicalRepWorkerLock);
                                534                 :                : 
 3152 peter_e@gmx.net           535         [ #  # ]:              0 :         ereport(WARNING,
                                536                 :                :                 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
                                537                 :                :                  errmsg("out of background worker slots"),
                                538                 :                :                  errhint("You might need to increase \"%s\".", "max_worker_processes")));
  971 akapila@postgresql.o      539                 :              0 :         return false;
                                540                 :                :     }
                                541                 :                : 
                                542                 :                :     /* Now wait until it attaches. */
  971 akapila@postgresql.o      543                 :CBC         410 :     return WaitForReplicationWorkerAttach(worker, generation, bgw_handle);
                                544                 :                : }
                                545                 :                : 
                                546                 :                : /*
                                547                 :                :  * Internal function to stop the worker and wait until it detaches from the
                                548                 :                :  * slot.
                                549                 :                :  */
                                550                 :                : static void
                                551                 :             83 : logicalrep_worker_stop_internal(LogicalRepWorker *worker, int signo)
                                552                 :                : {
                                553                 :                :     uint16      generation;
                                554                 :                : 
                                555         [ -  + ]:             83 :     Assert(LWLockHeldByMeInMode(LogicalRepWorkerLock, LW_SHARED));
                                556                 :                : 
                                557                 :                :     /*
                                558                 :                :      * Remember which generation was our worker so we can check if what we see
                                559                 :                :      * is still the same one.
                                560                 :                :      */
 3055 peter_e@gmx.net           561                 :             83 :     generation = worker->generation;
                                562                 :                : 
                                563                 :                :     /*
                                564                 :                :      * If we found a worker but it does not have proc set then it is still
                                565                 :                :      * starting up; wait for it to finish starting and then kill it.
                                566                 :                :      */
                                567   [ +  -  +  + ]:             83 :     while (worker->in_use && !worker->proc)
                                568                 :                :     {
                                569                 :                :         int         rc;
                                570                 :                : 
 3152                           571                 :              3 :         LWLockRelease(LogicalRepWorkerLock);
                                572                 :                : 
                                573                 :                :         /* Wait a bit --- we don't expect to have to wait long. */
 3014 andres@anarazel.de        574                 :              3 :         rc = WaitLatch(MyLatch,
                                575                 :                :                        WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
                                576                 :                :                        10L, WAIT_EVENT_BGWORKER_STARTUP);
                                577                 :                : 
                                578         [ -  + ]:              3 :         if (rc & WL_LATCH_SET)
                                579                 :                :         {
 3014 andres@anarazel.de        580                 :UBC           0 :             ResetLatch(MyLatch);
                                581         [ #  # ]:              0 :             CHECK_FOR_INTERRUPTS();
                                582                 :                :         }
                                583                 :                : 
                                584                 :                :         /* Recheck worker status. */
 3152 peter_e@gmx.net           585                 :CBC           3 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
                                586                 :                : 
                                587                 :                :         /*
                                588                 :                :          * Check whether the worker slot is no longer used, which would mean
                                589                 :                :          * that the worker has exited, or whether the worker generation is
                                590                 :                :          * different, meaning that a different worker has taken the slot.
                                591                 :                :          */
 3055                           592   [ +  -  -  + ]:              3 :         if (!worker->in_use || worker->generation != generation)
 3148 peter_e@gmx.net           593                 :UBC           0 :             return;
                                594                 :                : 
                                595                 :                :         /* Worker has assigned proc, so it has started. */
 3148 peter_e@gmx.net           596         [ +  - ]:CBC           3 :         if (worker->proc)
 3152                           597                 :              3 :             break;
                                598                 :                :     }
                                599                 :                : 
                                600                 :                :     /* Now terminate the worker ... */
  971 akapila@postgresql.o      601                 :             83 :     kill(worker->proc->pid, signo);
                                602                 :                : 
                                603                 :                :     /* ... and wait for it to die. */
                                604                 :                :     for (;;)
 3152 peter_e@gmx.net           605                 :            107 :     {
                                606                 :                :         int         rc;
                                607                 :                : 
                                608                 :                :         /* is it gone? */
 3055                           609   [ +  +  +  + ]:            190 :         if (!worker->proc || worker->generation != generation)
                                610                 :                :             break;
                                611                 :                : 
 2989 tgl@sss.pgh.pa.us         612                 :            107 :         LWLockRelease(LogicalRepWorkerLock);
                                613                 :                : 
                                614                 :                :         /* Wait a bit --- we don't expect to have to wait long. */
 3014 andres@anarazel.de        615                 :            107 :         rc = WaitLatch(MyLatch,
                                616                 :                :                        WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
                                617                 :                :                        10L, WAIT_EVENT_BGWORKER_SHUTDOWN);
                                618                 :                : 
                                619         [ +  + ]:            107 :         if (rc & WL_LATCH_SET)
                                620                 :                :         {
                                621                 :             33 :             ResetLatch(MyLatch);
                                622         [ +  + ]:             33 :             CHECK_FOR_INTERRUPTS();
                                623                 :                :         }
                                624                 :                : 
 2989 tgl@sss.pgh.pa.us         625                 :            107 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
                                626                 :                :     }
                                627                 :                : }
                                628                 :                : 
                                629                 :                : /*
                                630                 :                :  * Stop the logical replication worker for subid/relid, if any.
                                631                 :                :  */
                                632                 :                : void
  971 akapila@postgresql.o      633                 :             94 : logicalrep_worker_stop(Oid subid, Oid relid)
                                634                 :                : {
                                635                 :                :     LogicalRepWorker *worker;
                                636                 :                : 
                                637                 :             94 :     LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
                                638                 :                : 
                                639                 :             94 :     worker = logicalrep_worker_find(subid, relid, false);
                                640                 :                : 
                                641         [ +  + ]:             94 :     if (worker)
                                642                 :                :     {
                                643   [ +  -  -  + ]:             74 :         Assert(!isParallelApplyWorker(worker));
                                644                 :             74 :         logicalrep_worker_stop_internal(worker, SIGTERM);
                                645                 :                :     }
                                646                 :                : 
                                647                 :             94 :     LWLockRelease(LogicalRepWorkerLock);
                                648                 :             94 : }
                                649                 :                : 
                                650                 :                : /*
                                651                 :                :  * Stop the given logical replication parallel apply worker.
                                652                 :                :  *
                                653                 :                :  * Node that the function sends SIGINT instead of SIGTERM to the parallel apply
                                654                 :                :  * worker so that the worker exits cleanly.
                                655                 :                :  */
                                656                 :                : void
  851                           657                 :              5 : logicalrep_pa_worker_stop(ParallelApplyWorkerInfo *winfo)
                                658                 :                : {
                                659                 :                :     int         slot_no;
                                660                 :                :     uint16      generation;
                                661                 :                :     LogicalRepWorker *worker;
                                662                 :                : 
                                663         [ -  + ]:              5 :     SpinLockAcquire(&winfo->shared->mutex);
                                664                 :              5 :     generation = winfo->shared->logicalrep_worker_generation;
                                665                 :              5 :     slot_no = winfo->shared->logicalrep_worker_slot_no;
                                666                 :              5 :     SpinLockRelease(&winfo->shared->mutex);
                                667                 :                : 
  971                           668   [ +  -  -  + ]:              5 :     Assert(slot_no >= 0 && slot_no < max_logical_replication_workers);
                                669                 :                : 
                                670                 :                :     /*
                                671                 :                :      * Detach from the error_mq_handle for the parallel apply worker before
                                672                 :                :      * stopping it. This prevents the leader apply worker from trying to
                                673                 :                :      * receive the message from the error queue that might already be detached
                                674                 :                :      * by the parallel apply worker.
                                675                 :                :      */
  851                           676         [ +  - ]:              5 :     if (winfo->error_mq_handle)
                                677                 :                :     {
                                678                 :              5 :         shm_mq_detach(winfo->error_mq_handle);
                                679                 :              5 :         winfo->error_mq_handle = NULL;
                                680                 :                :     }
                                681                 :                : 
  971                           682                 :              5 :     LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
                                683                 :                : 
                                684                 :              5 :     worker = &LogicalRepCtx->workers[slot_no];
                                685   [ +  -  -  + ]:              5 :     Assert(isParallelApplyWorker(worker));
                                686                 :                : 
                                687                 :                :     /*
                                688                 :                :      * Only stop the worker if the generation matches and the worker is alive.
                                689                 :                :      */
                                690   [ +  -  +  - ]:              5 :     if (worker->generation == generation && worker->proc)
                                691                 :              5 :         logicalrep_worker_stop_internal(worker, SIGINT);
                                692                 :                : 
 2989 tgl@sss.pgh.pa.us         693                 :              5 :     LWLockRelease(LogicalRepWorkerLock);
 3152 peter_e@gmx.net           694                 :              5 : }
                                695                 :                : 
                                696                 :                : /*
                                697                 :                :  * Wake up (using latch) any logical replication worker for specified sub/rel.
                                698                 :                :  */
                                699                 :                : void
 3089                           700                 :            210 : logicalrep_worker_wakeup(Oid subid, Oid relid)
                                701                 :                : {
                                702                 :                :     LogicalRepWorker *worker;
                                703                 :                : 
                                704                 :            210 :     LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
                                705                 :                : 
                                706                 :            210 :     worker = logicalrep_worker_find(subid, relid, true);
                                707                 :                : 
                                708         [ +  - ]:            210 :     if (worker)
                                709                 :            210 :         logicalrep_worker_wakeup_ptr(worker);
                                710                 :                : 
 2990 tgl@sss.pgh.pa.us         711                 :            210 :     LWLockRelease(LogicalRepWorkerLock);
 3089 peter_e@gmx.net           712                 :            210 : }
                                713                 :                : 
                                714                 :                : /*
                                715                 :                :  * Wake up (using latch) the specified logical replication worker.
                                716                 :                :  *
                                717                 :                :  * Caller must hold lock, else worker->proc could change under us.
                                718                 :                :  */
                                719                 :                : void
                                720                 :            638 : logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker)
                                721                 :                : {
 2990 tgl@sss.pgh.pa.us         722         [ -  + ]:            638 :     Assert(LWLockHeldByMe(LogicalRepWorkerLock));
                                723                 :                : 
 3089 peter_e@gmx.net           724                 :            638 :     SetLatch(&worker->proc->procLatch);
                                725                 :            638 : }
                                726                 :                : 
                                727                 :                : /*
                                728                 :                :  * Attach to a slot.
                                729                 :                :  */
                                730                 :                : void
 3152                           731                 :            528 : logicalrep_worker_attach(int slot)
                                732                 :                : {
                                733                 :                :     /* Block concurrent access. */
                                734                 :            528 :     LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
                                735                 :                : 
                                736   [ +  -  -  + ]:            528 :     Assert(slot >= 0 && slot < max_logical_replication_workers);
                                737                 :            528 :     MyLogicalRepWorker = &LogicalRepCtx->workers[slot];
                                738                 :                : 
 3055                           739         [ -  + ]:            528 :     if (!MyLogicalRepWorker->in_use)
                                740                 :                :     {
 3055 peter_e@gmx.net           741                 :UBC           0 :         LWLockRelease(LogicalRepWorkerLock);
                                742         [ #  # ]:              0 :         ereport(ERROR,
                                743                 :                :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                                744                 :                :                  errmsg("logical replication worker slot %d is empty, cannot attach",
                                745                 :                :                         slot)));
                                746                 :                :     }
                                747                 :                : 
 3152 peter_e@gmx.net           748         [ -  + ]:CBC         528 :     if (MyLogicalRepWorker->proc)
                                749                 :                :     {
 3055 peter_e@gmx.net           750                 :UBC           0 :         LWLockRelease(LogicalRepWorkerLock);
 3152                           751         [ #  # ]:              0 :         ereport(ERROR,
                                752                 :                :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                                753                 :                :                  errmsg("logical replication worker slot %d is already used by "
                                754                 :                :                         "another worker, cannot attach", slot)));
                                755                 :                :     }
                                756                 :                : 
 3152 peter_e@gmx.net           757                 :CBC         528 :     MyLogicalRepWorker->proc = MyProc;
                                758                 :            528 :     before_shmem_exit(logicalrep_worker_onexit, (Datum) 0);
                                759                 :                : 
                                760                 :            528 :     LWLockRelease(LogicalRepWorkerLock);
                                761                 :            528 : }
                                762                 :                : 
                                763                 :                : /*
                                764                 :                :  * Stop the parallel apply workers if any, and detach the leader apply worker
                                765                 :                :  * (cleans up the worker info).
                                766                 :                :  */
                                767                 :                : static void
                                768                 :            528 : logicalrep_worker_detach(void)
                                769                 :                : {
                                770                 :                :     /* Stop the parallel apply workers. */
  971 akapila@postgresql.o      771         [ +  + ]:            528 :     if (am_leader_apply_worker())
                                772                 :                :     {
                                773                 :                :         List       *workers;
                                774                 :                :         ListCell   *lc;
                                775                 :                : 
                                776                 :                :         /*
                                777                 :                :          * Detach from the error_mq_handle for all parallel apply workers
                                778                 :                :          * before terminating them. This prevents the leader apply worker from
                                779                 :                :          * receiving the worker termination message and sending it to logs
                                780                 :                :          * when the same is already done by the parallel worker.
                                781                 :                :          */
                                782                 :            320 :         pa_detach_all_error_mq();
                                783                 :                : 
                                784                 :            320 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
                                785                 :                : 
  409                           786                 :            320 :         workers = logicalrep_workers_find(MyLogicalRepWorker->subid, true, false);
  971                           787   [ +  -  +  +  :            645 :         foreach(lc, workers)
                                              +  + ]
                                788                 :                :         {
                                789                 :            325 :             LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
                                790                 :                : 
                                791   [ +  -  +  + ]:            325 :             if (isParallelApplyWorker(w))
                                792                 :              4 :                 logicalrep_worker_stop_internal(w, SIGTERM);
                                793                 :                :         }
                                794                 :                : 
                                795                 :            320 :         LWLockRelease(LogicalRepWorkerLock);
                                796                 :                : 
   35 tgl@sss.pgh.pa.us         797                 :GNC         320 :         list_free(workers);
                                798                 :                :     }
                                799                 :                : 
                                800                 :                :     /* Block concurrent access. */
 3152 peter_e@gmx.net           801                 :CBC         528 :     LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
                                802                 :                : 
 3055                           803                 :            528 :     logicalrep_worker_cleanup(MyLogicalRepWorker);
                                804                 :                : 
 3152                           805                 :            528 :     LWLockRelease(LogicalRepWorkerLock);
                                806                 :            528 : }
                                807                 :                : 
                                808                 :                : /*
                                809                 :                :  * Clean up worker info.
                                810                 :                :  */
                                811                 :                : static void
 3055                           812                 :            528 : logicalrep_worker_cleanup(LogicalRepWorker *worker)
                                813                 :                : {
                                814         [ -  + ]:            528 :     Assert(LWLockHeldByMeInMode(LogicalRepWorkerLock, LW_EXCLUSIVE));
                                815                 :                : 
  743 akapila@postgresql.o      816                 :            528 :     worker->type = WORKERTYPE_UNKNOWN;
 3055 peter_e@gmx.net           817                 :            528 :     worker->in_use = false;
                                818                 :            528 :     worker->proc = NULL;
                                819                 :            528 :     worker->dbid = InvalidOid;
                                820                 :            528 :     worker->userid = InvalidOid;
                                821                 :            528 :     worker->subid = InvalidOid;
                                822                 :            528 :     worker->relid = InvalidOid;
  962 akapila@postgresql.o      823                 :            528 :     worker->leader_pid = InvalidPid;
  971                           824                 :            528 :     worker->parallel_apply = false;
 3055 peter_e@gmx.net           825                 :            528 : }
                                826                 :                : 
                                827                 :                : /*
                                828                 :                :  * Cleanup function for logical replication launcher.
                                829                 :                :  *
                                830                 :                :  * Called on logical replication launcher exit.
                                831                 :                :  */
                                832                 :                : static void
 3062 fujii@postgresql.org      833                 :            387 : logicalrep_launcher_onexit(int code, Datum arg)
                                834                 :                : {
                                835                 :            387 :     LogicalRepCtx->launcher_pid = 0;
                                836                 :            387 : }
                                837                 :                : 
                                838                 :                : /*
                                839                 :                :  * Cleanup function.
                                840                 :                :  *
                                841                 :                :  * Called on logical replication worker exit.
                                842                 :                :  */
                                843                 :                : static void
 3152 peter_e@gmx.net           844                 :            528 : logicalrep_worker_onexit(int code, Datum arg)
                                845                 :                : {
                                846                 :                :     /* Disconnect gracefully from the remote side. */
 1578 alvherre@alvh.no-ip.      847         [ +  + ]:            528 :     if (LogRepWorkerWalRcvConn)
                                848                 :            413 :         walrcv_disconnect(LogRepWorkerWalRcvConn);
                                849                 :                : 
 3152 peter_e@gmx.net           850                 :            528 :     logicalrep_worker_detach();
                                851                 :                : 
                                852                 :                :     /* Cleanup fileset used for streaming transactions. */
 1465 akapila@postgresql.o      853         [ +  + ]:            528 :     if (MyLogicalRepWorker->stream_fileset != NULL)
                                854                 :             14 :         FileSetDeleteAll(MyLogicalRepWorker->stream_fileset);
                                855                 :                : 
                                856                 :                :     /*
                                857                 :                :      * Session level locks may be acquired outside of a transaction in
                                858                 :                :      * parallel apply mode and will not be released when the worker
                                859                 :                :      * terminates, so manually release all locks before the worker exits.
                                860                 :                :      *
                                861                 :                :      * The locks will be acquired once the worker is initialized.
                                862                 :                :      */
  857                           863         [ +  + ]:            528 :     if (!InitializingApplyWorker)
                                864                 :            465 :         LockReleaseAll(DEFAULT_LOCKMETHOD, true);
                                865                 :                : 
 3019 peter_e@gmx.net           866                 :            528 :     ApplyLauncherWakeup();
 3152                           867                 :            528 : }
                                868                 :                : 
                                869                 :                : /*
                                870                 :                :  * Count the number of registered (not necessarily running) sync workers
                                871                 :                :  * for a subscription.
                                872                 :                :  */
                                873                 :                : int
 3089                           874                 :           1313 : logicalrep_sync_worker_count(Oid subid)
                                875                 :                : {
                                876                 :                :     int         i;
 3034 bruce@momjian.us          877                 :           1313 :     int         res = 0;
                                878                 :                : 
 3089 peter_e@gmx.net           879         [ -  + ]:           1313 :     Assert(LWLockHeldByMe(LogicalRepWorkerLock));
                                880                 :                : 
                                881                 :                :     /* Search for attached worker for a given subscription id. */
                                882         [ +  + ]:           6771 :     for (i = 0; i < max_logical_replication_workers; i++)
                                883                 :                :     {
 3034 bruce@momjian.us          884                 :           5458 :         LogicalRepWorker *w = &LogicalRepCtx->workers[i];
                                885                 :                : 
  743 akapila@postgresql.o      886   [ +  +  +  +  :           5458 :         if (isTablesyncWorker(w) && w->subid == subid)
                                              +  + ]
 3089 peter_e@gmx.net           887                 :           1541 :             res++;
                                888                 :                :     }
                                889                 :                : 
                                890                 :           1313 :     return res;
                                891                 :                : }
                                892                 :                : 
                                893                 :                : /*
                                894                 :                :  * Count the number of registered (but not necessarily running) parallel apply
                                895                 :                :  * workers for a subscription.
                                896                 :                :  */
                                897                 :                : static int
  971 akapila@postgresql.o      898                 :            410 : logicalrep_pa_worker_count(Oid subid)
                                899                 :                : {
                                900                 :                :     int         i;
                                901                 :            410 :     int         res = 0;
                                902                 :                : 
                                903         [ -  + ]:            410 :     Assert(LWLockHeldByMe(LogicalRepWorkerLock));
                                904                 :                : 
                                905                 :                :     /*
                                906                 :                :      * Scan all attached parallel apply workers, only counting those which
                                907                 :                :      * have the given subscription id.
                                908                 :                :      */
                                909         [ +  + ]:           2174 :     for (i = 0; i < max_logical_replication_workers; i++)
                                910                 :                :     {
                                911                 :           1764 :         LogicalRepWorker *w = &LogicalRepCtx->workers[i];
                                912                 :                : 
  743                           913   [ +  +  +  +  :           1764 :         if (isParallelApplyWorker(w) && w->subid == subid)
                                              +  - ]
  971                           914                 :              2 :             res++;
                                915                 :                :     }
                                916                 :                : 
                                917                 :            410 :     return res;
                                918                 :                : }
                                919                 :                : 
                                920                 :                : /*
                                921                 :                :  * ApplyLauncherShmemSize
                                922                 :                :  *      Compute space needed for replication launcher shared memory
                                923                 :                :  */
                                924                 :                : Size
 3152 peter_e@gmx.net           925                 :           3967 : ApplyLauncherShmemSize(void)
                                926                 :                : {
                                927                 :                :     Size        size;
                                928                 :                : 
                                929                 :                :     /*
                                930                 :                :      * Need the fixed struct and the array of LogicalRepWorker.
                                931                 :                :      */
                                932                 :           3967 :     size = sizeof(LogicalRepCtxStruct);
                                933                 :           3967 :     size = MAXALIGN(size);
                                934                 :           3967 :     size = add_size(size, mul_size(max_logical_replication_workers,
                                935                 :                :                                    sizeof(LogicalRepWorker)));
                                936                 :           3967 :     return size;
                                937                 :                : }
                                938                 :                : 
                                939                 :                : /*
                                940                 :                :  * ApplyLauncherRegister
                                941                 :                :  *      Register a background worker running the logical replication launcher.
                                942                 :                :  */
                                943                 :                : void
                                944                 :            813 : ApplyLauncherRegister(void)
                                945                 :                : {
                                946                 :                :     BackgroundWorker bgw;
                                947                 :                : 
                                948                 :                :     /*
                                949                 :                :      * The logical replication launcher is disabled during binary upgrades, to
                                950                 :                :      * prevent logical replication workers from running on the source cluster.
                                951                 :                :      * That could cause replication origins to move forward after having been
                                952                 :                :      * copied to the target cluster, potentially creating conflicts with the
                                953                 :                :      * copied data files.
                                954                 :                :      */
  603 michael@paquier.xyz       955   [ +  +  +  + ]:            813 :     if (max_logical_replication_workers == 0 || IsBinaryUpgrade)
 3152 peter_e@gmx.net           956                 :             53 :         return;
                                957                 :                : 
 3065 tgl@sss.pgh.pa.us         958                 :            760 :     memset(&bgw, 0, sizeof(bgw));
 3034 bruce@momjian.us          959                 :            760 :     bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
                                960                 :                :         BGWORKER_BACKEND_DATABASE_CONNECTION;
 3152 peter_e@gmx.net           961                 :            760 :     bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
  796 nathan@postgresql.or      962                 :            760 :     snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres");
 3081 rhaas@postgresql.org      963                 :            760 :     snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyLauncherMain");
 3152 peter_e@gmx.net           964                 :            760 :     snprintf(bgw.bgw_name, BGW_MAXLEN,
                                965                 :                :              "logical replication launcher");
 2928                           966                 :            760 :     snprintf(bgw.bgw_type, BGW_MAXLEN,
                                967                 :                :              "logical replication launcher");
 3152                           968                 :            760 :     bgw.bgw_restart_time = 5;
                                969                 :            760 :     bgw.bgw_notify_pid = 0;
                                970                 :            760 :     bgw.bgw_main_arg = (Datum) 0;
                                971                 :                : 
                                972                 :            760 :     RegisterBackgroundWorker(&bgw);
                                973                 :                : }
                                974                 :                : 
                                975                 :                : /*
                                976                 :                :  * ApplyLauncherShmemInit
                                977                 :                :  *      Allocate and initialize replication launcher shared memory
                                978                 :                :  */
                                979                 :                : void
                                980                 :           1029 : ApplyLauncherShmemInit(void)
                                981                 :                : {
                                982                 :                :     bool        found;
                                983                 :                : 
                                984                 :           1029 :     LogicalRepCtx = (LogicalRepCtxStruct *)
                                985                 :           1029 :         ShmemInitStruct("Logical Replication Launcher Data",
                                986                 :                :                         ApplyLauncherShmemSize(),
                                987                 :                :                         &found);
                                988                 :                : 
                                989         [ +  - ]:           1029 :     if (!found)
                                990                 :                :     {
                                991                 :                :         int         slot;
                                992                 :                : 
                                993                 :           1029 :         memset(LogicalRepCtx, 0, ApplyLauncherShmemSize());
                                994                 :                : 
  955 tgl@sss.pgh.pa.us         995                 :           1029 :         LogicalRepCtx->last_start_dsa = DSA_HANDLE_INVALID;
                                996                 :           1029 :         LogicalRepCtx->last_start_dsh = DSHASH_HANDLE_INVALID;
                                997                 :                : 
                                998                 :                :         /* Initialize memory and spin locks for each worker slot. */
 3089 peter_e@gmx.net           999         [ +  + ]:           5108 :         for (slot = 0; slot < max_logical_replication_workers; slot++)
                               1000                 :                :         {
                               1001                 :           4079 :             LogicalRepWorker *worker = &LogicalRepCtx->workers[slot];
                               1002                 :                : 
                               1003                 :           4079 :             memset(worker, 0, sizeof(LogicalRepWorker));
                               1004                 :           4079 :             SpinLockInit(&worker->relmutex);
                               1005                 :                :         }
                               1006                 :                :     }
 3152                          1007                 :           1029 : }
                               1008                 :                : 
                               1009                 :                : /*
                               1010                 :                :  * Initialize or attach to the dynamic shared hash table that stores the
                               1011                 :                :  * last-start times, if not already done.
                               1012                 :                :  * This must be called before accessing the table.
                               1013                 :                :  */
                               1014                 :                : static void
  958 tgl@sss.pgh.pa.us        1015                 :            728 : logicalrep_launcher_attach_dshmem(void)
                               1016                 :                : {
                               1017                 :                :     MemoryContext oldcontext;
                               1018                 :                : 
                               1019                 :                :     /* Quick exit if we already did this. */
  955                          1020         [ +  + ]:            728 :     if (LogicalRepCtx->last_start_dsh != DSHASH_HANDLE_INVALID &&
  958                          1021         [ +  + ]:            675 :         last_start_times != NULL)
                               1022                 :            489 :         return;
                               1023                 :                : 
                               1024                 :                :     /* Otherwise, use a lock to ensure only one process creates the table. */
                               1025                 :            239 :     LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
                               1026                 :                : 
                               1027                 :                :     /* Be sure any local memory allocated by DSA routines is persistent. */
                               1028                 :            239 :     oldcontext = MemoryContextSwitchTo(TopMemoryContext);
                               1029                 :                : 
  955                          1030         [ +  + ]:            239 :     if (LogicalRepCtx->last_start_dsh == DSHASH_HANDLE_INVALID)
                               1031                 :                :     {
                               1032                 :                :         /* Initialize dynamic shared hash table for last-start times. */
  958                          1033                 :             53 :         last_start_times_dsa = dsa_create(LWTRANCHE_LAUNCHER_DSA);
                               1034                 :             53 :         dsa_pin(last_start_times_dsa);
                               1035                 :             53 :         dsa_pin_mapping(last_start_times_dsa);
  558 nathan@postgresql.or     1036                 :             53 :         last_start_times = dshash_create(last_start_times_dsa, &dsh_params, NULL);
                               1037                 :                : 
                               1038                 :                :         /* Store handles in shared memory for other backends to use. */
  958 tgl@sss.pgh.pa.us        1039                 :             53 :         LogicalRepCtx->last_start_dsa = dsa_get_handle(last_start_times_dsa);
                               1040                 :             53 :         LogicalRepCtx->last_start_dsh = dshash_get_hash_table_handle(last_start_times);
                               1041                 :                :     }
                               1042         [ +  - ]:            186 :     else if (!last_start_times)
                               1043                 :                :     {
                               1044                 :                :         /* Attach to existing dynamic shared hash table. */
                               1045                 :            186 :         last_start_times_dsa = dsa_attach(LogicalRepCtx->last_start_dsa);
                               1046                 :            186 :         dsa_pin_mapping(last_start_times_dsa);
                               1047                 :            186 :         last_start_times = dshash_attach(last_start_times_dsa, &dsh_params,
   92 nathan@postgresql.or     1048                 :            186 :                                          LogicalRepCtx->last_start_dsh, NULL);
                               1049                 :                :     }
                               1050                 :                : 
  958 tgl@sss.pgh.pa.us        1051                 :            239 :     MemoryContextSwitchTo(oldcontext);
                               1052                 :            239 :     LWLockRelease(LogicalRepWorkerLock);
                               1053                 :                : }
                               1054                 :                : 
                               1055                 :                : /*
                               1056                 :                :  * Set the last-start time for the subscription.
                               1057                 :                :  */
                               1058                 :                : static void
                               1059                 :            204 : ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time)
                               1060                 :                : {
                               1061                 :                :     LauncherLastStartTimesEntry *entry;
                               1062                 :                :     bool        found;
                               1063                 :                : 
                               1064                 :            204 :     logicalrep_launcher_attach_dshmem();
                               1065                 :                : 
                               1066                 :            204 :     entry = dshash_find_or_insert(last_start_times, &subid, &found);
                               1067                 :            204 :     entry->last_start_time = start_time;
                               1068                 :            204 :     dshash_release_lock(last_start_times, entry);
                               1069                 :            204 : }
                               1070                 :                : 
                               1071                 :                : /*
                               1072                 :                :  * Return the last-start time for the subscription, or 0 if there isn't one.
                               1073                 :                :  */
                               1074                 :                : static TimestampTz
                               1075                 :            299 : ApplyLauncherGetWorkerStartTime(Oid subid)
                               1076                 :                : {
                               1077                 :                :     LauncherLastStartTimesEntry *entry;
                               1078                 :                :     TimestampTz ret;
                               1079                 :                : 
                               1080                 :            299 :     logicalrep_launcher_attach_dshmem();
                               1081                 :                : 
                               1082                 :            299 :     entry = dshash_find(last_start_times, &subid, false);
                               1083         [ +  + ]:            299 :     if (entry == NULL)
                               1084                 :            126 :         return 0;
                               1085                 :                : 
                               1086                 :            173 :     ret = entry->last_start_time;
                               1087                 :            173 :     dshash_release_lock(last_start_times, entry);
                               1088                 :                : 
                               1089                 :            173 :     return ret;
                               1090                 :                : }
                               1091                 :                : 
                               1092                 :                : /*
                               1093                 :                :  * Remove the last-start-time entry for the subscription, if one exists.
                               1094                 :                :  *
                               1095                 :                :  * This has two use-cases: to remove the entry related to a subscription
                               1096                 :                :  * that's been deleted or disabled (just to avoid leaking shared memory),
                               1097                 :                :  * and to allow immediate restart of an apply worker that has exited
                               1098                 :                :  * due to subscription parameter changes.
                               1099                 :                :  */
                               1100                 :                : void
                               1101                 :            225 : ApplyLauncherForgetWorkerStartTime(Oid subid)
                               1102                 :                : {
                               1103                 :            225 :     logicalrep_launcher_attach_dshmem();
                               1104                 :                : 
                               1105                 :            225 :     (void) dshash_delete_key(last_start_times, &subid);
                               1106                 :            225 : }
                               1107                 :                : 
                               1108                 :                : /*
                               1109                 :                :  * Wakeup the launcher on commit if requested.
                               1110                 :                :  */
                               1111                 :                : void
 3050 peter_e@gmx.net          1112                 :         318579 : AtEOXact_ApplyLauncher(bool isCommit)
                               1113                 :                : {
 2955                          1114         [ +  + ]:         318579 :     if (isCommit)
                               1115                 :                :     {
                               1116         [ +  + ]:         293340 :         if (on_commit_launcher_wakeup)
                               1117                 :            140 :             ApplyLauncherWakeup();
                               1118                 :                :     }
                               1119                 :                : 
 3050                          1120                 :         318579 :     on_commit_launcher_wakeup = false;
 3152                          1121                 :         318579 : }
                               1122                 :                : 
                               1123                 :                : /*
                               1124                 :                :  * Request wakeup of the launcher on commit of the transaction.
                               1125                 :                :  *
                               1126                 :                :  * This is used to send launcher signal to stop sleeping and process the
                               1127                 :                :  * subscriptions when current transaction commits. Should be used when new
                               1128                 :                :  * tuple was added to the pg_subscription catalog.
                               1129                 :                : */
                               1130                 :                : void
                               1131                 :            140 : ApplyLauncherWakeupAtCommit(void)
                               1132                 :                : {
 3134 heikki.linnakangas@i     1133         [ +  - ]:            140 :     if (!on_commit_launcher_wakeup)
                               1134                 :            140 :         on_commit_launcher_wakeup = true;
 3152 peter_e@gmx.net          1135                 :            140 : }
                               1136                 :                : 
                               1137                 :                : /*
                               1138                 :                :  * Wakeup the launcher immediately.
                               1139                 :                :  */
                               1140                 :                : void
                               1141                 :            693 : ApplyLauncherWakeup(void)
                               1142                 :                : {
 3062 fujii@postgresql.org     1143         [ +  + ]:            693 :     if (LogicalRepCtx->launcher_pid != 0)
 3152 peter_e@gmx.net          1144                 :            667 :         kill(LogicalRepCtx->launcher_pid, SIGUSR1);
                               1145                 :            693 : }
                               1146                 :                : 
                               1147                 :                : /*
                               1148                 :                :  * Main loop for the apply launcher process.
                               1149                 :                :  */
                               1150                 :                : void
                               1151                 :            387 : ApplyLauncherMain(Datum main_arg)
                               1152                 :                : {
 3102 tgl@sss.pgh.pa.us        1153         [ +  + ]:            387 :     ereport(DEBUG1,
                               1154                 :                :             (errmsg_internal("logical replication launcher started")));
                               1155                 :                : 
 3062 fujii@postgresql.org     1156                 :            387 :     before_shmem_exit(logicalrep_launcher_onexit, (Datum) 0);
                               1157                 :                : 
 3012 andres@anarazel.de       1158         [ -  + ]:            387 :     Assert(LogicalRepCtx->launcher_pid == 0);
                               1159                 :            387 :     LogicalRepCtx->launcher_pid = MyProcPid;
                               1160                 :                : 
                               1161                 :                :     /* Establish signal handlers. */
 1877 akapila@postgresql.o     1162                 :            387 :     pqsignal(SIGHUP, SignalHandlerForConfigReload);
 3012 andres@anarazel.de       1163                 :            387 :     pqsignal(SIGTERM, die);
 3152 peter_e@gmx.net          1164                 :            387 :     BackgroundWorkerUnblockSignals();
                               1165                 :                : 
                               1166                 :                :     /*
                               1167                 :                :      * Establish connection to nailed catalogs (we only ever access
                               1168                 :                :      * pg_subscription).
                               1169                 :                :      */
 2711 magnus@hagander.net      1170                 :            387 :     BackgroundWorkerInitializeConnection(NULL, NULL, 0);
                               1171                 :                : 
                               1172                 :                :     /*
                               1173                 :                :      * Acquire the conflict detection slot at startup to ensure it can be
                               1174                 :                :      * dropped if no longer needed after a restart.
                               1175                 :                :      */
   45 akapila@postgresql.o     1176                 :GNC         387 :     acquire_conflict_slot_if_exists();
                               1177                 :                : 
                               1178                 :                :     /* Enter main loop */
                               1179                 :                :     for (;;)
 3152 peter_e@gmx.net          1180                 :CBC        2283 :     {
                               1181                 :                :         int         rc;
                               1182                 :                :         List       *sublist;
                               1183                 :                :         ListCell   *lc;
                               1184                 :                :         MemoryContext subctx;
                               1185                 :                :         MemoryContext oldctx;
 3034 bruce@momjian.us         1186                 :           2670 :         long        wait_time = DEFAULT_NAPTIME_PER_CYCLE;
    4 akapila@postgresql.o     1187                 :GNC        2670 :         bool        can_update_xmin = true;
   45                          1188                 :           2670 :         bool        retain_dead_tuples = false;
                               1189                 :           2670 :         TransactionId xmin = InvalidTransactionId;
                               1190                 :                : 
 3012 andres@anarazel.de       1191         [ +  + ]:CBC        2670 :         CHECK_FOR_INTERRUPTS();
                               1192                 :                : 
                               1193                 :                :         /* Use temporary context to avoid leaking memory across cycles. */
  958 tgl@sss.pgh.pa.us        1194                 :           2669 :         subctx = AllocSetContextCreate(TopMemoryContext,
                               1195                 :                :                                        "Logical Replication Launcher sublist",
                               1196                 :                :                                        ALLOCSET_DEFAULT_SIZES);
                               1197                 :           2669 :         oldctx = MemoryContextSwitchTo(subctx);
                               1198                 :                : 
                               1199                 :                :         /*
                               1200                 :                :          * Start any missing workers for enabled subscriptions.
                               1201                 :                :          *
                               1202                 :                :          * Also, during the iteration through all subscriptions, we compute
                               1203                 :                :          * the minimum XID required to protect deleted tuples for conflict
                               1204                 :                :          * detection if one of the subscription enables retain_dead_tuples
                               1205                 :                :          * option.
                               1206                 :                :          */
                               1207                 :           2669 :         sublist = get_subscription_list();
                               1208   [ +  +  +  +  :           3489 :         foreach(lc, sublist)
                                              +  + ]
                               1209                 :                :         {
                               1210                 :            822 :             Subscription *sub = (Subscription *) lfirst(lc);
                               1211                 :                :             LogicalRepWorker *w;
                               1212                 :                :             TimestampTz last_start;
                               1213                 :                :             TimestampTz now;
                               1214                 :                :             long        elapsed;
                               1215                 :                : 
   45 akapila@postgresql.o     1216         [ +  + ]:GNC         822 :             if (sub->retaindeadtuples)
                               1217                 :                :             {
                               1218                 :              5 :                 retain_dead_tuples = true;
                               1219                 :                : 
                               1220                 :                :                 /*
                               1221                 :                :                  * Create a replication slot to retain information necessary
                               1222                 :                :                  * for conflict detection such as dead tuples, commit
                               1223                 :                :                  * timestamps, and origins.
                               1224                 :                :                  *
                               1225                 :                :                  * The slot is created before starting the apply worker to
                               1226                 :                :                  * prevent it from unnecessarily maintaining its
                               1227                 :                :                  * oldest_nonremovable_xid.
                               1228                 :                :                  *
                               1229                 :                :                  * The slot is created even for a disabled subscription to
                               1230                 :                :                  * ensure that conflict-related information is available when
                               1231                 :                :                  * applying remote changes that occurred before the
                               1232                 :                :                  * subscription was enabled.
                               1233                 :                :                  */
                               1234                 :              5 :                 CreateConflictDetectionSlot();
                               1235                 :                : 
    4                          1236         [ +  - ]:              5 :                 if (sub->retentionactive)
                               1237                 :                :                 {
                               1238                 :                :                     /*
                               1239                 :                :                      * Can't advance xmin of the slot unless all the
                               1240                 :                :                      * subscriptions actively retaining dead tuples are
                               1241                 :                :                      * enabled. This is required to ensure that we don't
                               1242                 :                :                      * advance the xmin of CONFLICT_DETECTION_SLOT if one of
                               1243                 :                :                      * the subscriptions is not enabled. Otherwise, we won't
                               1244                 :                :                      * be able to detect conflicts reliably for such a
                               1245                 :                :                      * subscription even though it has set the
                               1246                 :                :                      * retain_dead_tuples option.
                               1247                 :                :                      */
                               1248                 :              5 :                     can_update_xmin &= sub->enabled;
                               1249                 :                : 
                               1250                 :                :                     /*
                               1251                 :                :                      * Initialize the slot once the subscription activiates
                               1252                 :                :                      * retention.
                               1253                 :                :                      */
                               1254         [ -  + ]:              5 :                     if (!TransactionIdIsValid(MyReplicationSlot->data.xmin))
    4 akapila@postgresql.o     1255                 :UNC           0 :                         init_conflict_slot_xmin();
                               1256                 :                :                 }
                               1257                 :                :             }
                               1258                 :                : 
  958 tgl@sss.pgh.pa.us        1259         [ +  + ]:CBC         822 :             if (!sub->enabled)
                               1260                 :             36 :                 continue;
                               1261                 :                : 
                               1262                 :            786 :             LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
                               1263                 :            786 :             w = logicalrep_worker_find(sub->oid, InvalidOid, false);
                               1264                 :            786 :             LWLockRelease(LogicalRepWorkerLock);
                               1265                 :                : 
                               1266         [ +  + ]:            786 :             if (w != NULL)
                               1267                 :                :             {
                               1268                 :                :                 /*
                               1269                 :                :                  * Compute the minimum xmin required to protect dead tuples
                               1270                 :                :                  * required for conflict detection among all running apply
                               1271                 :                :                  * workers.
                               1272                 :                :                  */
    4 akapila@postgresql.o     1273         [ +  + ]:GNC         487 :                 if (sub->retaindeadtuples &&
                               1274   [ +  -  +  - ]:              3 :                     sub->retentionactive &&
                               1275                 :                :                     can_update_xmin)
   45                          1276                 :              3 :                     compute_min_nonremovable_xid(w, &xmin);
                               1277                 :                : 
                               1278                 :                :                 /* worker is running already */
                               1279                 :            487 :                 continue;
                               1280                 :                :             }
                               1281                 :                : 
                               1282                 :                :             /*
                               1283                 :                :              * Can't advance xmin of the slot unless all the workers
                               1284                 :                :              * corresponding to subscriptions actively retaining dead tuples
                               1285                 :                :              * are running, disabling the further computation of the minimum
                               1286                 :                :              * nonremovable xid.
                               1287                 :                :              */
    4                          1288   [ +  +  +  - ]:            299 :             if (sub->retaindeadtuples && sub->retentionactive)
                               1289                 :              1 :                 can_update_xmin = false;
                               1290                 :                : 
                               1291                 :                :             /*
                               1292                 :                :              * If the worker is eligible to start now, launch it.  Otherwise,
                               1293                 :                :              * adjust wait_time so that we'll wake up as soon as it can be
                               1294                 :                :              * started.
                               1295                 :                :              *
                               1296                 :                :              * Each subscription's apply worker can only be restarted once per
                               1297                 :                :              * wal_retrieve_retry_interval, so that errors do not cause us to
                               1298                 :                :              * repeatedly restart the worker as fast as possible.  In cases
                               1299                 :                :              * where a restart is expected (e.g., subscription parameter
                               1300                 :                :              * changes), another process should remove the last-start entry
                               1301                 :                :              * for the subscription so that the worker can be restarted
                               1302                 :                :              * without waiting for wal_retrieve_retry_interval to elapse.
                               1303                 :                :              */
  958 tgl@sss.pgh.pa.us        1304                 :CBC         299 :             last_start = ApplyLauncherGetWorkerStartTime(sub->oid);
                               1305                 :            299 :             now = GetCurrentTimestamp();
                               1306         [ +  + ]:            299 :             if (last_start == 0 ||
                               1307         [ +  + ]:            173 :                 (elapsed = TimestampDifferenceMilliseconds(last_start, now)) >= wal_retrieve_retry_interval)
                               1308                 :                :             {
                               1309                 :            204 :                 ApplyLauncherSetWorkerStartTime(sub->oid, now);
   74                          1310         [ -  + ]:            202 :                 if (!logicalrep_worker_launch(WORKERTYPE_APPLY,
                               1311                 :            204 :                                               sub->dbid, sub->oid, sub->name,
                               1312                 :                :                                               sub->owner, InvalidOid,
                               1313                 :                :                                               DSM_HANDLE_INVALID,
    4 akapila@postgresql.o     1314         [ +  + ]:GNC         205 :                                               sub->retaindeadtuples &&
                               1315         [ +  - ]:              1 :                                               sub->retentionactive))
                               1316                 :                :                 {
                               1317                 :                :                     /*
                               1318                 :                :                      * We get here either if we failed to launch a worker
                               1319                 :                :                      * (perhaps for resource-exhaustion reasons) or if we
                               1320                 :                :                      * launched one but it immediately quit.  Either way, it
                               1321                 :                :                      * seems appropriate to try again after
                               1322                 :                :                      * wal_retrieve_retry_interval.
                               1323                 :                :                      */
   74 tgl@sss.pgh.pa.us        1324                 :LBC         (1) :                     wait_time = Min(wait_time,
                               1325                 :                :                                     wal_retrieve_retry_interval);
                               1326                 :                :                 }
                               1327                 :                :             }
                               1328                 :                :             else
                               1329                 :                :             {
  958 tgl@sss.pgh.pa.us        1330                 :CBC          95 :                 wait_time = Min(wait_time,
                               1331                 :                :                                 wal_retrieve_retry_interval - elapsed);
                               1332                 :                :             }
                               1333                 :                :         }
                               1334                 :                : 
                               1335                 :                :         /*
                               1336                 :                :          * Drop the CONFLICT_DETECTION_SLOT slot if there is no subscription
                               1337                 :                :          * that requires us to retain dead tuples. Otherwise, if required,
                               1338                 :                :          * advance the slot's xmin to protect dead tuples required for the
                               1339                 :                :          * conflict detection.
                               1340                 :                :          *
                               1341                 :                :          * Additionally, if all apply workers for subscriptions with
                               1342                 :                :          * retain_dead_tuples enabled have requested to stop retention, the
                               1343                 :                :          * slot's xmin will be set to InvalidTransactionId allowing the
                               1344                 :                :          * removal of dead tuples.
                               1345                 :                :          */
   45 akapila@postgresql.o     1346         [ +  + ]:GNC        2667 :         if (MyReplicationSlot)
                               1347                 :                :         {
                               1348         [ +  + ]:              6 :             if (!retain_dead_tuples)
                               1349                 :              1 :                 ReplicationSlotDropAcquired();
    4                          1350         [ +  + ]:              5 :             else if (can_update_xmin)
                               1351                 :              3 :                 update_conflict_slot_xmin(xmin);
                               1352                 :                :         }
                               1353                 :                : 
                               1354                 :                :         /* Switch back to original memory context. */
  958 tgl@sss.pgh.pa.us        1355                 :CBC        2667 :         MemoryContextSwitchTo(oldctx);
                               1356                 :                :         /* Clean the temporary memory. */
                               1357                 :           2667 :         MemoryContextDelete(subctx);
                               1358                 :                : 
                               1359                 :                :         /* Wait for more work. */
 3014 andres@anarazel.de       1360                 :           2667 :         rc = WaitLatch(MyLatch,
                               1361                 :                :                        WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
                               1362                 :                :                        wait_time,
                               1363                 :                :                        WAIT_EVENT_LOGICAL_LAUNCHER_MAIN);
                               1364                 :                : 
                               1365         [ +  + ]:           2664 :         if (rc & WL_LATCH_SET)
                               1366                 :                :         {
                               1367                 :           2647 :             ResetLatch(MyLatch);
                               1368         [ +  + ]:           2647 :             CHECK_FOR_INTERRUPTS();
                               1369                 :                :         }
                               1370                 :                : 
 2090 rhaas@postgresql.org     1371         [ +  + ]:           2283 :         if (ConfigReloadPending)
                               1372                 :                :         {
                               1373                 :             40 :             ConfigReloadPending = false;
 3071 peter_e@gmx.net          1374                 :             40 :             ProcessConfigFile(PGC_SIGHUP);
                               1375                 :                :         }
                               1376                 :                :     }
                               1377                 :                : 
                               1378                 :                :     /* Not reachable */
                               1379                 :                : }
                               1380                 :                : 
                               1381                 :                : /*
                               1382                 :                :  * Determine the minimum non-removable transaction ID across all apply workers
                               1383                 :                :  * for subscriptions that have retain_dead_tuples enabled. Store the result
                               1384                 :                :  * in *xmin.
                               1385                 :                :  */
                               1386                 :                : static void
   45 akapila@postgresql.o     1387                 :GNC           3 : compute_min_nonremovable_xid(LogicalRepWorker *worker, TransactionId *xmin)
                               1388                 :                : {
                               1389                 :                :     TransactionId nonremovable_xid;
                               1390                 :                : 
                               1391         [ -  + ]:              3 :     Assert(worker != NULL);
                               1392                 :                : 
                               1393                 :                :     /*
                               1394                 :                :      * The replication slot for conflict detection must be created before the
                               1395                 :                :      * worker starts.
                               1396                 :                :      */
                               1397         [ -  + ]:              3 :     Assert(MyReplicationSlot);
                               1398                 :                : 
                               1399         [ -  + ]:              3 :     SpinLockAcquire(&worker->relmutex);
                               1400                 :              3 :     nonremovable_xid = worker->oldest_nonremovable_xid;
                               1401                 :              3 :     SpinLockRelease(&worker->relmutex);
                               1402                 :                : 
                               1403                 :                :     /*
                               1404                 :                :      * Return if the apply worker has stopped retention concurrently.
                               1405                 :                :      *
                               1406                 :                :      * Although this function is invoked only when retentionactive is true,
                               1407                 :                :      * the apply worker might stop retention after the launcher fetches the
                               1408                 :                :      * retentionactive flag.
                               1409                 :                :      */
    4                          1410         [ -  + ]:              3 :     if (!TransactionIdIsValid(nonremovable_xid))
    4 akapila@postgresql.o     1411                 :UNC           0 :         return;
                               1412                 :                : 
   45 akapila@postgresql.o     1413   [ -  +  -  - ]:GNC           3 :     if (!TransactionIdIsValid(*xmin) ||
   45 akapila@postgresql.o     1414                 :UNC           0 :         TransactionIdPrecedes(nonremovable_xid, *xmin))
   45 akapila@postgresql.o     1415                 :GNC           3 :         *xmin = nonremovable_xid;
                               1416                 :                : }
                               1417                 :                : 
                               1418                 :                : /*
                               1419                 :                :  * Acquire the replication slot used to retain information for conflict
                               1420                 :                :  * detection, if it exists.
                               1421                 :                :  *
                               1422                 :                :  * Return true if successfully acquired, otherwise return false.
                               1423                 :                :  */
                               1424                 :                : static bool
                               1425                 :            387 : acquire_conflict_slot_if_exists(void)
                               1426                 :                : {
                               1427         [ +  - ]:            387 :     if (!SearchNamedReplicationSlot(CONFLICT_DETECTION_SLOT, true))
                               1428                 :            387 :         return false;
                               1429                 :                : 
   45 akapila@postgresql.o     1430                 :UNC           0 :     ReplicationSlotAcquire(CONFLICT_DETECTION_SLOT, true, false);
                               1431                 :              0 :     return true;
                               1432                 :                : }
                               1433                 :                : 
                               1434                 :                : /*
                               1435                 :                :  * Update the xmin the replication slot used to retain information required
                               1436                 :                :  * for conflict detection.
                               1437                 :                :  */
                               1438                 :                : static void
    4 akapila@postgresql.o     1439                 :GNC           3 : update_conflict_slot_xmin(TransactionId new_xmin)
                               1440                 :                : {
   45                          1441         [ -  + ]:              3 :     Assert(MyReplicationSlot);
    4                          1442   [ +  -  -  + ]:              3 :     Assert(!TransactionIdIsValid(new_xmin) ||
                               1443                 :                :            TransactionIdPrecedesOrEquals(MyReplicationSlot->data.xmin, new_xmin));
                               1444                 :                : 
                               1445                 :                :     /* Return if the xmin value of the slot cannot be updated */
   45                          1446         [ +  + ]:              3 :     if (TransactionIdEquals(MyReplicationSlot->data.xmin, new_xmin))
                               1447                 :              2 :         return;
                               1448                 :                : 
                               1449         [ -  + ]:              1 :     SpinLockAcquire(&MyReplicationSlot->mutex);
                               1450                 :              1 :     MyReplicationSlot->effective_xmin = new_xmin;
                               1451                 :              1 :     MyReplicationSlot->data.xmin = new_xmin;
                               1452                 :              1 :     SpinLockRelease(&MyReplicationSlot->mutex);
                               1453                 :                : 
                               1454         [ -  + ]:              1 :     elog(DEBUG1, "updated xmin: %u", MyReplicationSlot->data.xmin);
                               1455                 :                : 
                               1456                 :              1 :     ReplicationSlotMarkDirty();
                               1457                 :              1 :     ReplicationSlotsComputeRequiredXmin(false);
                               1458                 :                : 
                               1459                 :                :     /*
                               1460                 :                :      * Like PhysicalConfirmReceivedLocation(), do not save slot information
                               1461                 :                :      * each time. This is acceptable because all concurrent transactions on
                               1462                 :                :      * the publisher that require the data preceding the slot's xmin should
                               1463                 :                :      * have already been applied and flushed on the subscriber before the xmin
                               1464                 :                :      * is advanced. So, even if the slot's xmin regresses after a restart, it
                               1465                 :                :      * will be advanced again in the next cycle. Therefore, no data required
                               1466                 :                :      * for conflict detection will be prematurely removed.
                               1467                 :                :      */
                               1468                 :              1 :     return;
                               1469                 :                : }
                               1470                 :                : 
                               1471                 :                : /*
                               1472                 :                :  * Initialize the xmin for the conflict detection slot.
                               1473                 :                :  */
                               1474                 :                : static void
    4                          1475                 :              3 : init_conflict_slot_xmin(void)
                               1476                 :                : {
                               1477                 :                :     TransactionId xmin_horizon;
                               1478                 :                : 
                               1479                 :                :     /* Replication slot must exist but shouldn't be initialized. */
                               1480   [ +  -  -  + ]:              3 :     Assert(MyReplicationSlot &&
                               1481                 :                :            !TransactionIdIsValid(MyReplicationSlot->data.xmin));
                               1482                 :                : 
   45                          1483                 :              3 :     LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
                               1484                 :                : 
                               1485                 :              3 :     xmin_horizon = GetOldestSafeDecodingTransactionId(false);
                               1486                 :                : 
                               1487         [ -  + ]:              3 :     SpinLockAcquire(&MyReplicationSlot->mutex);
                               1488                 :              3 :     MyReplicationSlot->effective_xmin = xmin_horizon;
                               1489                 :              3 :     MyReplicationSlot->data.xmin = xmin_horizon;
                               1490                 :              3 :     SpinLockRelease(&MyReplicationSlot->mutex);
                               1491                 :                : 
                               1492                 :              3 :     ReplicationSlotsComputeRequiredXmin(true);
                               1493                 :                : 
                               1494                 :              3 :     LWLockRelease(ProcArrayLock);
                               1495                 :                : 
                               1496                 :                :     /* Write this slot to disk */
                               1497                 :              3 :     ReplicationSlotMarkDirty();
                               1498                 :              3 :     ReplicationSlotSave();
                               1499                 :              3 : }
                               1500                 :                : 
                               1501                 :                : /*
                               1502                 :                :  * Create and acquire the replication slot used to retain information for
                               1503                 :                :  * conflict detection, if not yet.
                               1504                 :                :  */
                               1505                 :                : void
    4                          1506                 :              6 : CreateConflictDetectionSlot(void)
                               1507                 :                : {
                               1508                 :                :     /* Exit early, if the replication slot is already created and acquired */
                               1509         [ +  + ]:              6 :     if (MyReplicationSlot)
                               1510                 :              3 :         return;
                               1511                 :                : 
                               1512         [ +  - ]:              3 :     ereport(LOG,
                               1513                 :                :             errmsg("creating replication conflict detection slot"));
                               1514                 :                : 
                               1515                 :              3 :     ReplicationSlotCreate(CONFLICT_DETECTION_SLOT, false, RS_PERSISTENT, false,
                               1516                 :                :                           false, false);
                               1517                 :                : 
                               1518                 :              3 :     init_conflict_slot_xmin();
                               1519                 :                : }
                               1520                 :                : 
                               1521                 :                : /*
                               1522                 :                :  * Is current process the logical replication launcher?
                               1523                 :                :  */
                               1524                 :                : bool
 3012 andres@anarazel.de       1525                 :CBC        2314 : IsLogicalLauncher(void)
                               1526                 :                : {
                               1527                 :           2314 :     return LogicalRepCtx->launcher_pid == MyProcPid;
                               1528                 :                : }
                               1529                 :                : 
                               1530                 :                : /*
                               1531                 :                :  * Return the pid of the leader apply worker if the given pid is the pid of a
                               1532                 :                :  * parallel apply worker, otherwise, return InvalidPid.
                               1533                 :                :  */
                               1534                 :                : pid_t
  962 akapila@postgresql.o     1535                 :            833 : GetLeaderApplyWorkerPid(pid_t pid)
                               1536                 :                : {
                               1537                 :            833 :     int         leader_pid = InvalidPid;
                               1538                 :                :     int         i;
                               1539                 :                : 
                               1540                 :            833 :     LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
                               1541                 :                : 
                               1542         [ +  + ]:           4165 :     for (i = 0; i < max_logical_replication_workers; i++)
                               1543                 :                :     {
                               1544                 :           3332 :         LogicalRepWorker *w = &LogicalRepCtx->workers[i];
                               1545                 :                : 
                               1546   [ +  +  -  +  :           3332 :         if (isParallelApplyWorker(w) && w->proc && pid == w->proc->pid)
                                        -  -  -  - ]
                               1547                 :                :         {
  962 akapila@postgresql.o     1548                 :UBC           0 :             leader_pid = w->leader_pid;
                               1549                 :              0 :             break;
                               1550                 :                :         }
                               1551                 :                :     }
                               1552                 :                : 
  962 akapila@postgresql.o     1553                 :CBC         833 :     LWLockRelease(LogicalRepWorkerLock);
                               1554                 :                : 
                               1555                 :            833 :     return leader_pid;
                               1556                 :                : }
                               1557                 :                : 
                               1558                 :                : /*
                               1559                 :                :  * Returns state of the subscriptions.
                               1560                 :                :  */
                               1561                 :                : Datum
 3152 peter_e@gmx.net          1562                 :              1 : pg_stat_get_subscription(PG_FUNCTION_ARGS)
                               1563                 :                : {
                               1564                 :                : #define PG_STAT_GET_SUBSCRIPTION_COLS   10
                               1565         [ -  + ]:              1 :     Oid         subid = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0);
                               1566                 :                :     int         i;
                               1567                 :              1 :     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
                               1568                 :                : 
 1054 michael@paquier.xyz      1569                 :              1 :     InitMaterializedSRF(fcinfo, 0);
                               1570                 :                : 
                               1571                 :                :     /* Make sure we get consistent view of the workers. */
 3152 peter_e@gmx.net          1572                 :              1 :     LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
                               1573                 :                : 
 1187 tgl@sss.pgh.pa.us        1574         [ +  + ]:              5 :     for (i = 0; i < max_logical_replication_workers; i++)
                               1575                 :                :     {
                               1576                 :                :         /* for each row */
 1148 peter@eisentraut.org     1577                 :              4 :         Datum       values[PG_STAT_GET_SUBSCRIPTION_COLS] = {0};
                               1578                 :              4 :         bool        nulls[PG_STAT_GET_SUBSCRIPTION_COLS] = {0};
                               1579                 :                :         int         worker_pid;
                               1580                 :                :         LogicalRepWorker worker;
                               1581                 :                : 
 3152 peter_e@gmx.net          1582                 :              4 :         memcpy(&worker, &LogicalRepCtx->workers[i],
                               1583                 :                :                sizeof(LogicalRepWorker));
                               1584   [ +  +  -  + ]:              4 :         if (!worker.proc || !IsBackendPid(worker.proc->pid))
                               1585                 :              2 :             continue;
                               1586                 :                : 
                               1587   [ -  +  -  - ]:              2 :         if (OidIsValid(subid) && worker.subid != subid)
 3152 peter_e@gmx.net          1588                 :UBC           0 :             continue;
                               1589                 :                : 
 3152 peter_e@gmx.net          1590                 :CBC           2 :         worker_pid = worker.proc->pid;
                               1591                 :                : 
                               1592                 :              2 :         values[0] = ObjectIdGetDatum(worker.subid);
  754 akapila@postgresql.o     1593   [ +  -  -  + ]:              2 :         if (isTablesyncWorker(&worker))
 3089 peter_e@gmx.net          1594                 :UBC           0 :             values[1] = ObjectIdGetDatum(worker.relid);
                               1595                 :                :         else
 3089 peter_e@gmx.net          1596                 :CBC           2 :             nulls[1] = true;
                               1597                 :              2 :         values[2] = Int32GetDatum(worker_pid);
                               1598                 :                : 
  962 akapila@postgresql.o     1599   [ +  -  -  + ]:              2 :         if (isParallelApplyWorker(&worker))
  962 akapila@postgresql.o     1600                 :UBC           0 :             values[3] = Int32GetDatum(worker.leader_pid);
                               1601                 :                :         else
 3089 peter_e@gmx.net          1602                 :CBC           2 :             nulls[3] = true;
                               1603                 :                : 
  962 akapila@postgresql.o     1604         [ -  + ]:              2 :         if (XLogRecPtrIsInvalid(worker.last_lsn))
  962 akapila@postgresql.o     1605                 :UBC           0 :             nulls[4] = true;
                               1606                 :                :         else
  962 akapila@postgresql.o     1607                 :CBC           2 :             values[4] = LSNGetDatum(worker.last_lsn);
 3152 peter_e@gmx.net          1608         [ -  + ]:              2 :         if (worker.last_send_time == 0)
  962 akapila@postgresql.o     1609                 :UBC           0 :             nulls[5] = true;
                               1610                 :                :         else
  962 akapila@postgresql.o     1611                 :CBC           2 :             values[5] = TimestampTzGetDatum(worker.last_send_time);
 3152 peter_e@gmx.net          1612         [ -  + ]:              2 :         if (worker.last_recv_time == 0)
  962 akapila@postgresql.o     1613                 :UBC           0 :             nulls[6] = true;
                               1614                 :                :         else
  962 akapila@postgresql.o     1615                 :CBC           2 :             values[6] = TimestampTzGetDatum(worker.last_recv_time);
 3152 peter_e@gmx.net          1616         [ -  + ]:              2 :         if (XLogRecPtrIsInvalid(worker.reply_lsn))
  962 akapila@postgresql.o     1617                 :UBC           0 :             nulls[7] = true;
                               1618                 :                :         else
  962 akapila@postgresql.o     1619                 :CBC           2 :             values[7] = LSNGetDatum(worker.reply_lsn);
 3152 peter_e@gmx.net          1620         [ -  + ]:              2 :         if (worker.reply_time == 0)
  962 akapila@postgresql.o     1621                 :UBC           0 :             nulls[8] = true;
                               1622                 :                :         else
  962 akapila@postgresql.o     1623                 :CBC           2 :             values[8] = TimestampTzGetDatum(worker.reply_time);
                               1624                 :                : 
  712 nathan@postgresql.or     1625   [ +  -  -  -  :              2 :         switch (worker.type)
                                                 - ]
                               1626                 :                :         {
                               1627                 :              2 :             case WORKERTYPE_APPLY:
                               1628                 :              2 :                 values[9] = CStringGetTextDatum("apply");
                               1629                 :              2 :                 break;
  712 nathan@postgresql.or     1630                 :UBC           0 :             case WORKERTYPE_PARALLEL_APPLY:
                               1631                 :              0 :                 values[9] = CStringGetTextDatum("parallel apply");
                               1632                 :              0 :                 break;
                               1633                 :              0 :             case WORKERTYPE_TABLESYNC:
                               1634                 :              0 :                 values[9] = CStringGetTextDatum("table synchronization");
                               1635                 :              0 :                 break;
                               1636                 :              0 :             case WORKERTYPE_UNKNOWN:
                               1637                 :                :                 /* Should never happen. */
                               1638         [ #  # ]:              0 :                 elog(ERROR, "unknown worker type");
                               1639                 :                :         }
                               1640                 :                : 
 1279 michael@paquier.xyz      1641                 :CBC           2 :         tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
                               1642                 :                :                              values, nulls);
                               1643                 :                : 
                               1644                 :                :         /*
                               1645                 :                :          * If only a single subscription was requested, and we found it,
                               1646                 :                :          * break.
                               1647                 :                :          */
 3152 peter_e@gmx.net          1648         [ -  + ]:              2 :         if (OidIsValid(subid))
 3152 peter_e@gmx.net          1649                 :UBC           0 :             break;
                               1650                 :                :     }
                               1651                 :                : 
 3152 peter_e@gmx.net          1652                 :CBC           1 :     LWLockRelease(LogicalRepWorkerLock);
                               1653                 :                : 
                               1654                 :              1 :     return (Datum) 0;
                               1655                 :                : }
        

Generated by: LCOV version 2.4-beta