LCOV - differential code coverage report
Current view: top level - src/backend/replication/logical - tablesync.c (source / functions) Coverage Total Hit UNC LBC UBC GBC GNC CBC DUB DCB
Current: 806555e3000d0b0e0c536c1dc65548128d457d86 vs 1d325ad99cb2dec0e8b45ba36909ee0a497d2a57 Lines: 90.8 % 502 456 4 42 31 425 4 77
Current Date: 2025-12-17 08:58:58 +0900 Functions: 100.0 % 16 16 11 5 9
Baseline: lcov-20251217-005640-baseline Branches: 63.3 % 300 190 4 1 105 1 16 173
Baseline Date: 2025-12-16 12:57:12 -0800 Line coverage date bins:
Legend: Lines:     hit not hit
Branches: + taken - not taken # not executed
(7,30] days: 100.0 % 3 3 3
(30,360] days: 91.3 % 46 42 4 28 14
(360..) days: 90.7 % 453 411 42 411
Function coverage date bins:
(7,30] days: 100.0 % 1 1 1
(30,360] days: 100.0 % 5 5 5
(360..) days: 100.0 % 10 10 5 5
Branch coverage date bins:
(7,30] days: 87.5 % 8 7 1 7
(30,360] days: 77.8 % 18 14 3 1 9 5
(360..) days: 61.7 % 274 169 105 1 168

 Age         Owner                    Branch data    TLA  Line data    Source code
                                  1                 :                : /*-------------------------------------------------------------------------
                                  2                 :                :  * tablesync.c
                                  3                 :                :  *    PostgreSQL logical replication: initial table data synchronization
                                  4                 :                :  *
                                  5                 :                :  * Copyright (c) 2012-2025, PostgreSQL Global Development Group
                                  6                 :                :  *
                                  7                 :                :  * IDENTIFICATION
                                  8                 :                :  *    src/backend/replication/logical/tablesync.c
                                  9                 :                :  *
                                 10                 :                :  * NOTES
                                 11                 :                :  *    This file contains code for initial table data synchronization for
                                 12                 :                :  *    logical replication.
                                 13                 :                :  *
                                 14                 :                :  *    The initial data synchronization is done separately for each table,
                                 15                 :                :  *    in a separate apply worker that only fetches the initial snapshot data
                                 16                 :                :  *    from the publisher and then synchronizes the position in the stream with
                                 17                 :                :  *    the leader apply worker.
                                 18                 :                :  *
                                 19                 :                :  *    There are several reasons for doing the synchronization this way:
                                 20                 :                :  *     - It allows us to parallelize the initial data synchronization
                                 21                 :                :  *       which lowers the time needed for it to happen.
                                 22                 :                :  *     - The initial synchronization does not have to hold the xid and LSN
                                 23                 :                :  *       for the time it takes to copy data of all tables, causing less
                                 24                 :                :  *       bloat and lower disk consumption compared to doing the
                                 25                 :                :  *       synchronization in a single process for the whole database.
                                 26                 :                :  *     - It allows us to synchronize any tables added after the initial
                                 27                 :                :  *       synchronization has finished.
                                 28                 :                :  *
                                 29                 :                :  *    The stream position synchronization works in multiple steps:
                                 30                 :                :  *     - Apply worker requests a tablesync worker to start, setting the new
                                 31                 :                :  *       table state to INIT.
                                 32                 :                :  *     - Tablesync worker starts; changes table state from INIT to DATASYNC while
                                 33                 :                :  *       copying.
                                 34                 :                :  *     - Tablesync worker does initial table copy; there is a FINISHEDCOPY (sync
                                 35                 :                :  *       worker specific) state to indicate when the copy phase has completed, so
                                 36                 :                :  *       if the worker crashes with this (non-memory) state then the copy will not
                                 37                 :                :  *       be re-attempted.
                                 38                 :                :  *     - Tablesync worker then sets table state to SYNCWAIT; waits for state change.
                                 39                 :                :  *     - Apply worker periodically checks for tables in SYNCWAIT state.  When
                                 40                 :                :  *       any appear, it sets the table state to CATCHUP and starts loop-waiting
                                 41                 :                :  *       until either the table state is set to SYNCDONE or the sync worker
                                 42                 :                :  *       exits.
                                 43                 :                :  *     - After the sync worker has seen the state change to CATCHUP, it will
                                 44                 :                :  *       read the stream and apply changes (acting like an apply worker) until
                                 45                 :                :  *       it catches up to the specified stream position.  Then it sets the
                                 46                 :                :  *       state to SYNCDONE.  There might be zero changes applied between
                                 47                 :                :  *       CATCHUP and SYNCDONE, because the sync worker might be ahead of the
                                 48                 :                :  *       apply worker.
                                 49                 :                :  *     - Once the state is set to SYNCDONE, the apply will continue tracking
                                 50                 :                :  *       the table until it reaches the SYNCDONE stream position, at which
                                 51                 :                :  *       point it sets state to READY and stops tracking.  Again, there might
                                 52                 :                :  *       be zero changes in between.
                                 53                 :                :  *
                                 54                 :                :  *    So the state progression is always: INIT -> DATASYNC -> FINISHEDCOPY
                                 55                 :                :  *    -> SYNCWAIT -> CATCHUP -> SYNCDONE -> READY.
                                 56                 :                :  *
                                 57                 :                :  *    The catalog pg_subscription_rel is used to keep information about
                                 58                 :                :  *    subscribed tables and their state.  The catalog holds all states
                                 59                 :                :  *    except SYNCWAIT and CATCHUP which are only in shared memory.
                                 60                 :                :  *
                                 61                 :                :  *    Example flows look like this:
                                 62                 :                :  *     - Apply is in front:
                                 63                 :                :  *        sync:8
                                 64                 :                :  *          -> set in catalog FINISHEDCOPY
                                 65                 :                :  *          -> set in memory SYNCWAIT
                                 66                 :                :  *        apply:10
                                 67                 :                :  *          -> set in memory CATCHUP
                                 68                 :                :  *          -> enter wait-loop
                                 69                 :                :  *        sync:10
                                 70                 :                :  *          -> set in catalog SYNCDONE
                                 71                 :                :  *          -> exit
                                 72                 :                :  *        apply:10
                                 73                 :                :  *          -> exit wait-loop
                                 74                 :                :  *          -> continue rep
                                 75                 :                :  *        apply:11
                                 76                 :                :  *          -> set in catalog READY
                                 77                 :                :  *
                                 78                 :                :  *     - Sync is in front:
                                 79                 :                :  *        sync:10
                                 80                 :                :  *          -> set in catalog FINISHEDCOPY
                                 81                 :                :  *          -> set in memory SYNCWAIT
                                 82                 :                :  *        apply:8
                                 83                 :                :  *          -> set in memory CATCHUP
                                 84                 :                :  *          -> continue per-table filtering
                                 85                 :                :  *        sync:10
                                 86                 :                :  *          -> set in catalog SYNCDONE
                                 87                 :                :  *          -> exit
                                 88                 :                :  *        apply:10
                                 89                 :                :  *          -> set in catalog READY
                                 90                 :                :  *          -> stop per-table filtering
                                 91                 :                :  *          -> continue rep
                                 92                 :                :  *-------------------------------------------------------------------------
                                 93                 :                :  */
                                 94                 :                : 
                                 95                 :                : #include "postgres.h"
                                 96                 :                : 
                                 97                 :                : #include "access/table.h"
                                 98                 :                : #include "access/xact.h"
                                 99                 :                : #include "catalog/indexing.h"
                                100                 :                : #include "catalog/pg_subscription_rel.h"
                                101                 :                : #include "catalog/pg_type.h"
                                102                 :                : #include "commands/copy.h"
                                103                 :                : #include "miscadmin.h"
                                104                 :                : #include "nodes/makefuncs.h"
                                105                 :                : #include "parser/parse_relation.h"
                                106                 :                : #include "pgstat.h"
                                107                 :                : #include "replication/logicallauncher.h"
                                108                 :                : #include "replication/logicalrelation.h"
                                109                 :                : #include "replication/logicalworker.h"
                                110                 :                : #include "replication/origin.h"
                                111                 :                : #include "replication/slot.h"
                                112                 :                : #include "replication/walreceiver.h"
                                113                 :                : #include "replication/worker_internal.h"
                                114                 :                : #include "storage/ipc.h"
                                115                 :                : #include "storage/lmgr.h"
                                116                 :                : #include "utils/acl.h"
                                117                 :                : #include "utils/array.h"
                                118                 :                : #include "utils/builtins.h"
                                119                 :                : #include "utils/lsyscache.h"
                                120                 :                : #include "utils/rls.h"
                                121                 :                : #include "utils/snapmgr.h"
                                122                 :                : #include "utils/syscache.h"
                                123                 :                : #include "utils/usercontext.h"
                                124                 :                : 
                                125                 :                : List       *table_states_not_ready = NIL;
                                126                 :                : 
                                127                 :                : static StringInfo copybuf = NULL;
                                128                 :                : 
                                129                 :                : /*
                                130                 :                :  * Wait until the relation sync state is set in the catalog to the expected
                                131                 :                :  * one; return true when it happens.
                                132                 :                :  *
                                133                 :                :  * Returns false if the table sync worker or the table itself have
                                134                 :                :  * disappeared, or the table state has been reset.
                                135                 :                :  *
                                136                 :                :  * Currently, this is used in the apply worker when transitioning from
                                137                 :                :  * CATCHUP state to SYNCDONE.
                                138                 :                :  */
                                139                 :                : static bool
   62 akapila@postgresql.o      140                 :GNC         183 : wait_for_table_state_change(Oid relid, char expected_state)
                                141                 :                : {
                                142                 :                :     char        state;
                                143                 :                : 
                                144                 :                :     for (;;)
 3191 peter_e@gmx.net           145                 :CBC         202 :     {
                                146                 :                :         LogicalRepWorker *worker;
                                147                 :                :         XLogRecPtr  statelsn;
                                148                 :                : 
 3120                           149         [ -  + ]:            385 :         CHECK_FOR_INTERRUPTS();
                                150                 :                : 
 1889 alvherre@alvh.no-ip.      151                 :            385 :         InvalidateCatalogSnapshot();
 3116 peter_e@gmx.net           152                 :            385 :         state = GetSubscriptionRelState(MyLogicalRepWorker->subid,
                                153                 :                :                                         relid, &statelsn);
                                154                 :                : 
                                155         [ -  + ]:            385 :         if (state == SUBREL_STATE_UNKNOWN)
 1889 alvherre@alvh.no-ip.      156                 :UBC           0 :             break;
                                157                 :                : 
 3116 peter_e@gmx.net           158         [ +  + ]:CBC         385 :         if (state == expected_state)
                                159                 :            183 :             return true;
                                160                 :                : 
                                161                 :                :         /* Check if the sync worker is still running and bail if not. */
 3191                           162                 :            202 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
   50 akapila@postgresql.o      163                 :GNC         202 :         worker = logicalrep_worker_find(WORKERTYPE_TABLESYNC,
                                164                 :            202 :                                         MyLogicalRepWorker->subid, relid,
                                165                 :                :                                         false);
 3116 peter_e@gmx.net           166                 :CBC         202 :         LWLockRelease(LogicalRepWorkerLock);
 3191                           167         [ -  + ]:            202 :         if (!worker)
 1889 alvherre@alvh.no-ip.      168                 :UBC           0 :             break;
                                169                 :                : 
 2581 tmunro@postgresql.or      170                 :CBC         202 :         (void) WaitLatch(MyLatch,
                                171                 :                :                          WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
                                172                 :                :                          1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
                                173                 :                : 
 3116 andres@anarazel.de        174                 :            202 :         ResetLatch(MyLatch);
                                175                 :                :     }
                                176                 :                : 
 3116 peter_e@gmx.net           177                 :UBC           0 :     return false;
                                178                 :                : }
                                179                 :                : 
                                180                 :                : /*
                                181                 :                :  * Wait until the apply worker changes the state of our synchronization
                                182                 :                :  * worker to the expected one.
                                183                 :                :  *
                                184                 :                :  * Used when transitioning from SYNCWAIT state to CATCHUP.
                                185                 :                :  *
                                186                 :                :  * Returns false if the apply worker has disappeared.
                                187                 :                :  */
                                188                 :                : static bool
 3116 peter_e@gmx.net           189                 :CBC         185 : wait_for_worker_state_change(char expected_state)
                                190                 :                : {
                                191                 :                :     int         rc;
                                192                 :                : 
                                193                 :                :     for (;;)
                                194                 :            185 :     {
                                195                 :                :         LogicalRepWorker *worker;
                                196                 :                : 
                                197         [ -  + ]:            370 :         CHECK_FOR_INTERRUPTS();
                                198                 :                : 
                                199                 :                :         /*
                                200                 :                :          * Done if already in correct state.  (We assume this fetch is atomic
                                201                 :                :          * enough to not give a misleading answer if we do it with no lock.)
                                202                 :                :          */
 3092 tgl@sss.pgh.pa.us         203         [ +  + ]:            370 :         if (MyLogicalRepWorker->relstate == expected_state)
                                204                 :            185 :             return true;
                                205                 :                : 
                                206                 :                :         /*
                                207                 :                :          * Bail out if the apply worker has died, else signal it we're
                                208                 :                :          * waiting.
                                209                 :                :          */
 3116 peter_e@gmx.net           210                 :            185 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
   50 akapila@postgresql.o      211                 :GNC         185 :         worker = logicalrep_worker_find(WORKERTYPE_APPLY,
                                212                 :            185 :                                         MyLogicalRepWorker->subid, InvalidOid,
                                213                 :                :                                         false);
 3092 tgl@sss.pgh.pa.us         214   [ +  -  +  - ]:CBC         185 :         if (worker && worker->proc)
                                215                 :            185 :             logicalrep_worker_wakeup_ptr(worker);
 3116 peter_e@gmx.net           216                 :            185 :         LWLockRelease(LogicalRepWorkerLock);
                                217         [ -  + ]:            185 :         if (!worker)
 3092 tgl@sss.pgh.pa.us         218                 :UBC           0 :             break;
                                219                 :                : 
                                220                 :                :         /*
                                221                 :                :          * Wait.  We expect to get a latch signal back from the apply worker,
                                222                 :                :          * but use a timeout in case it dies without sending one.
                                223                 :                :          */
 3116 andres@anarazel.de        224                 :CBC         185 :         rc = WaitLatch(MyLatch,
                                225                 :                :                        WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
                                226                 :                :                        1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
                                227                 :                : 
 3092 tgl@sss.pgh.pa.us         228         [ +  - ]:            185 :         if (rc & WL_LATCH_SET)
                                229                 :            185 :             ResetLatch(MyLatch);
                                230                 :                :     }
                                231                 :                : 
 3191 peter_e@gmx.net           232                 :UBC           0 :     return false;
                                233                 :                : }
                                234                 :                : 
                                235                 :                : /*
                                236                 :                :  * Handle table synchronization cooperation from the synchronization
                                237                 :                :  * worker.
                                238                 :                :  *
                                239                 :                :  * If the sync worker is in CATCHUP state and reached (or passed) the
                                240                 :                :  * predetermined synchronization point in the WAL stream, mark the table as
                                241                 :                :  * SYNCDONE and finish.
                                242                 :                :  */
                                243                 :                : void
   62 akapila@postgresql.o      244                 :GNC         200 : ProcessSyncingTablesForSync(XLogRecPtr current_lsn)
                                245                 :                : {
 3191 peter_e@gmx.net           246         [ -  + ]:CBC         200 :     SpinLockAcquire(&MyLogicalRepWorker->relmutex);
                                247                 :                : 
                                248         [ +  - ]:            200 :     if (MyLogicalRepWorker->relstate == SUBREL_STATE_CATCHUP &&
                                249         [ +  + ]:            200 :         current_lsn >= MyLogicalRepWorker->relstate_lsn)
                                250                 :                :     {
                                251                 :                :         TimeLineID  tli;
 1769 akapila@postgresql.o      252                 :            185 :         char        syncslotname[NAMEDATALEN] = {0};
 1205                           253                 :            185 :         char        originname[NAMEDATALEN] = {0};
                                254                 :                : 
 3116 peter_e@gmx.net           255                 :            185 :         MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCDONE;
 3191                           256                 :            185 :         MyLogicalRepWorker->relstate_lsn = current_lsn;
                                257                 :                : 
                                258                 :            185 :         SpinLockRelease(&MyLogicalRepWorker->relmutex);
                                259                 :                : 
                                260                 :                :         /*
                                261                 :                :          * UpdateSubscriptionRelState must be called within a transaction.
                                262                 :                :          */
 1769 akapila@postgresql.o      263         [ +  - ]:            185 :         if (!IsTransactionState())
                                264                 :            185 :             StartTransactionCommand();
                                265                 :                : 
 2812 peter_e@gmx.net           266                 :            185 :         UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
                                267                 :            185 :                                    MyLogicalRepWorker->relid,
                                268                 :            185 :                                    MyLogicalRepWorker->relstate,
  138 akapila@postgresql.o      269                 :            185 :                                    MyLogicalRepWorker->relstate_lsn,
                                270                 :                :                                    false);
                                271                 :                : 
                                272                 :                :         /*
                                273                 :                :          * End streaming so that LogRepWorkerWalRcvConn can be used to drop
                                274                 :                :          * the slot.
                                275                 :                :          */
 1680 alvherre@alvh.no-ip.      276                 :            185 :         walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli);
                                277                 :                : 
                                278                 :                :         /*
                                279                 :                :          * Cleanup the tablesync slot.
                                280                 :                :          *
                                281                 :                :          * This has to be done after updating the state because otherwise if
                                282                 :                :          * there is an error while doing the database operations we won't be
                                283                 :                :          * able to rollback dropped slot.
                                284                 :                :          */
 1769 akapila@postgresql.o      285                 :            185 :         ReplicationSlotNameForTablesync(MyLogicalRepWorker->subid,
                                286                 :            185 :                                         MyLogicalRepWorker->relid,
                                287                 :                :                                         syncslotname,
                                288                 :                :                                         sizeof(syncslotname));
                                289                 :                : 
                                290                 :                :         /*
                                291                 :                :          * It is important to give an error if we are unable to drop the slot,
                                292                 :                :          * otherwise, it won't be dropped till the corresponding subscription
                                293                 :                :          * is dropped. So passing missing_ok = false.
                                294                 :                :          */
 1680 alvherre@alvh.no-ip.      295                 :            185 :         ReplicationSlotDropAtPubNode(LogRepWorkerWalRcvConn, syncslotname, false);
                                296                 :                : 
 1192 akapila@postgresql.o      297                 :            185 :         CommitTransactionCommand();
                                298                 :            185 :         pgstat_report_stat(false);
                                299                 :                : 
                                300                 :                :         /*
                                301                 :                :          * Start a new transaction to clean up the tablesync origin tracking.
                                302                 :                :          * This transaction will be ended within the FinishSyncWorker(). Now,
                                303                 :                :          * even, if we fail to remove this here, the apply worker will ensure
                                304                 :                :          * to clean it up afterward.
                                305                 :                :          *
                                306                 :                :          * We need to do this after the table state is set to SYNCDONE.
                                307                 :                :          * Otherwise, if an error occurs while performing the database
                                308                 :                :          * operation, the worker will be restarted and the in-memory state of
                                309                 :                :          * replication progress (remote_lsn) won't be rolled-back which would
                                310                 :                :          * have been cleared before restart. So, the restarted worker will use
                                311                 :                :          * invalid replication progress state resulting in replay of
                                312                 :                :          * transactions that have already been applied.
                                313                 :                :          */
                                314                 :            185 :         StartTransactionCommand();
                                315                 :                : 
 1163                           316                 :            185 :         ReplicationOriginNameForLogicalRep(MyLogicalRepWorker->subid,
                                317                 :            185 :                                            MyLogicalRepWorker->relid,
                                318                 :                :                                            originname,
                                319                 :                :                                            sizeof(originname));
                                320                 :                : 
                                321                 :                :         /*
                                322                 :                :          * Resetting the origin session removes the ownership of the slot.
                                323                 :                :          * This is needed to allow the origin to be dropped.
                                324                 :                :          */
 1192                           325                 :            185 :         replorigin_session_reset();
                                326                 :            185 :         replorigin_session_origin = InvalidRepOriginId;
                                327                 :            185 :         replorigin_session_origin_lsn = InvalidXLogRecPtr;
                                328                 :            185 :         replorigin_session_origin_timestamp = 0;
                                329                 :                : 
                                330                 :                :         /*
                                331                 :                :          * Drop the tablesync's origin tracking if exists.
                                332                 :                :          *
                                333                 :                :          * There is a chance that the user is concurrently performing refresh
                                334                 :                :          * for the subscription where we remove the table state and its origin
                                335                 :                :          * or the apply worker would have removed this origin. So passing
                                336                 :                :          * missing_ok = true.
                                337                 :                :          */
                                338                 :            185 :         replorigin_drop_by_name(originname, true, false);
                                339                 :                : 
   62 akapila@postgresql.o      340                 :GNC         185 :         FinishSyncWorker();
                                341                 :                :     }
                                342                 :                :     else
 3191 peter_e@gmx.net           343                 :CBC          15 :         SpinLockRelease(&MyLogicalRepWorker->relmutex);
                                344                 :             15 : }
                                345                 :                : 
                                346                 :                : /*
                                347                 :                :  * Handle table synchronization cooperation from the apply worker.
                                348                 :                :  *
                                349                 :                :  * Walk over all subscription tables that are individually tracked by the
                                350                 :                :  * apply process (currently, all that have state other than
                                351                 :                :  * SUBREL_STATE_READY) and manage synchronization for them.
                                352                 :                :  *
                                353                 :                :  * If there are tables that need synchronizing and are not being synchronized
                                354                 :                :  * yet, start sync workers for them (if there are free slots for sync
                                355                 :                :  * workers).  To prevent starting the sync worker for the same relation at a
                                356                 :                :  * high frequency after a failure, we store its last start time with each sync
                                357                 :                :  * state info.  We start the sync worker for the same relation after waiting
                                358                 :                :  * at least wal_retrieve_retry_interval.
                                359                 :                :  *
                                360                 :                :  * For tables that are being synchronized already, check if sync workers
                                361                 :                :  * either need action from the apply worker or have finished.  This is the
                                362                 :                :  * SYNCWAIT to CATCHUP transition.
                                363                 :                :  *
                                364                 :                :  * If the synchronization position is reached (SYNCDONE), then the table can
                                365                 :                :  * be marked as READY and is no longer tracked.
                                366                 :                :  */
                                367                 :                : void
   62 akapila@postgresql.o      368                 :GNC        3812 : ProcessSyncingTablesForApply(XLogRecPtr current_lsn)
                                369                 :                : {
                                370                 :                :     struct tablesync_start_time_mapping
                                371                 :                :     {
                                372                 :                :         Oid         relid;
                                373                 :                :         TimestampTz last_start_time;
                                374                 :                :     };
                                375                 :                :     static HTAB *last_start_times = NULL;
                                376                 :                :     ListCell   *lc;
                                377                 :                :     bool        started_tx;
 1076 tgl@sss.pgh.pa.us         378                 :CBC        3812 :     bool        should_exit = false;
  138 akapila@postgresql.o      379                 :           3812 :     Relation    rel = NULL;
                                380                 :                : 
 3191 peter_e@gmx.net           381         [ -  + ]:           3812 :     Assert(!IsTransactionState());
                                382                 :                : 
                                383                 :                :     /* We need up-to-date sync state info for subscription tables here. */
   42 akapila@postgresql.o      384                 :GNC        3812 :     FetchRelationStates(NULL, NULL, &started_tx);
                                385                 :                : 
                                386                 :                :     /*
                                387                 :                :      * Prepare a hash table for tracking last start times of workers, to avoid
                                388                 :                :      * immediate restarts.  We don't need it if there are no tables that need
                                389                 :                :      * syncing.
                                390                 :                :      */
 1218 tgl@sss.pgh.pa.us         391   [ +  +  +  + ]:CBC        3812 :     if (table_states_not_ready != NIL && !last_start_times)
 3156 peter_e@gmx.net           392                 :            123 :     {
                                393                 :                :         HASHCTL     ctl;
                                394                 :                : 
                                395                 :            123 :         ctl.keysize = sizeof(Oid);
                                396                 :            123 :         ctl.entrysize = sizeof(struct tablesync_start_time_mapping);
                                397                 :            123 :         last_start_times = hash_create("Logical replication table sync worker start times",
                                398                 :                :                                        256, &ctl, HASH_ELEM | HASH_BLOBS);
                                399                 :                :     }
                                400                 :                : 
                                401                 :                :     /*
                                402                 :                :      * Clean up the hash table when we're done with all tables (just to
                                403                 :                :      * release the bit of memory).
                                404                 :                :      */
 1218 tgl@sss.pgh.pa.us         405   [ +  +  +  + ]:           3689 :     else if (table_states_not_ready == NIL && last_start_times)
                                406                 :                :     {
 3156 peter_e@gmx.net           407                 :             92 :         hash_destroy(last_start_times);
                                408                 :             92 :         last_start_times = NULL;
                                409                 :                :     }
                                410                 :                : 
                                411                 :                :     /*
                                412                 :                :      * Process all tables that are being synchronized.
                                413                 :                :      */
 1617 akapila@postgresql.o      414   [ +  +  +  +  :           5634 :     foreach(lc, table_states_not_ready)
                                              +  + ]
                                415                 :                :     {
 3136 bruce@momjian.us          416                 :           1823 :         SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
                                417                 :                : 
   42 akapila@postgresql.o      418         [ +  + ]:GNC        1823 :         if (!started_tx)
                                419                 :                :         {
                                420                 :            303 :             StartTransactionCommand();
                                421                 :            303 :             started_tx = true;
                                422                 :                :         }
                                423                 :                : 
                                424         [ -  + ]:           1823 :         Assert(get_rel_relkind(rstate->relid) != RELKIND_SEQUENCE);
                                425                 :                : 
 3191 peter_e@gmx.net           426         [ +  + ]:CBC        1823 :         if (rstate->state == SUBREL_STATE_SYNCDONE)
                                427                 :                :         {
                                428                 :                :             /*
                                429                 :                :              * Apply has caught up to the position where the table sync has
                                430                 :                :              * finished.  Mark the table as ready so that the apply will just
                                431                 :                :              * continue to replicate it normally.
                                432                 :                :              */
                                433         [ +  + ]:            182 :             if (current_lsn >= rstate->lsn)
                                434                 :                :             {
                                435                 :                :                 char        originname[NAMEDATALEN];
                                436                 :                : 
                                437                 :            181 :                 rstate->state = SUBREL_STATE_READY;
                                438                 :            181 :                 rstate->lsn = current_lsn;
                                439                 :                : 
                                440                 :                :                 /*
                                441                 :                :                  * Remove the tablesync origin tracking if exists.
                                442                 :                :                  *
                                443                 :                :                  * There is a chance that the user is concurrently performing
                                444                 :                :                  * refresh for the subscription where we remove the table
                                445                 :                :                  * state and its origin or the tablesync worker would have
                                446                 :                :                  * already removed this origin. We can't rely on tablesync
                                447                 :                :                  * worker to remove the origin tracking as if there is any
                                448                 :                :                  * error while dropping we won't restart it to drop the
                                449                 :                :                  * origin. So passing missing_ok = true.
                                450                 :                :                  *
                                451                 :                :                  * Lock the subscription and origin in the same order as we
                                452                 :                :                  * are doing during DDL commands to avoid deadlocks. See
                                453                 :                :                  * AlterSubscription_refresh.
                                454                 :                :                  */
  138 akapila@postgresql.o      455                 :            181 :                 LockSharedObject(SubscriptionRelationId, MyLogicalRepWorker->subid,
                                456                 :                :                                  0, AccessShareLock);
                                457                 :                : 
                                458         [ +  - ]:            181 :                 if (!rel)
                                459                 :            181 :                     rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
                                460                 :                : 
 1163                           461                 :            181 :                 ReplicationOriginNameForLogicalRep(MyLogicalRepWorker->subid,
                                462                 :                :                                                    rstate->relid,
                                463                 :                :                                                    originname,
                                464                 :                :                                                    sizeof(originname));
 1192                           465                 :            181 :                 replorigin_drop_by_name(originname, true, false);
                                466                 :                : 
                                467                 :                :                 /*
                                468                 :                :                  * Update the state to READY only after the origin cleanup.
                                469                 :                :                  */
 2812 peter_e@gmx.net           470                 :            181 :                 UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
                                471                 :            181 :                                            rstate->relid, rstate->state,
                                472                 :                :                                            rstate->lsn, true);
                                473                 :                :             }
                                474                 :                :         }
                                475                 :                :         else
                                476                 :                :         {
                                477                 :                :             LogicalRepWorker *syncworker;
                                478                 :                : 
                                479                 :                :             /*
                                480                 :                :              * Look for a sync worker for this relation.
                                481                 :                :              */
 3191                           482                 :           1641 :             LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
                                483                 :                : 
   50 akapila@postgresql.o      484                 :GNC        1641 :             syncworker = logicalrep_worker_find(WORKERTYPE_TABLESYNC,
                                485                 :           1641 :                                                 MyLogicalRepWorker->subid,
                                486                 :                :                                                 rstate->relid, false);
                                487                 :                : 
 3191 peter_e@gmx.net           488         [ +  + ]:CBC        1641 :             if (syncworker)
                                489                 :                :             {
                                490                 :                :                 /* Found one, update our copy of its state */
                                491         [ -  + ]:            747 :                 SpinLockAcquire(&syncworker->relmutex);
                                492                 :            747 :                 rstate->state = syncworker->relstate;
                                493                 :            747 :                 rstate->lsn = syncworker->relstate_lsn;
 3092 tgl@sss.pgh.pa.us         494         [ +  + ]:            747 :                 if (rstate->state == SUBREL_STATE_SYNCWAIT)
                                495                 :                :                 {
                                496                 :                :                     /*
                                497                 :                :                      * Sync worker is waiting for apply.  Tell sync worker it
                                498                 :                :                      * can catchup now.
                                499                 :                :                      */
                                500                 :            183 :                     syncworker->relstate = SUBREL_STATE_CATCHUP;
                                501                 :            183 :                     syncworker->relstate_lsn =
                                502                 :            183 :                         Max(syncworker->relstate_lsn, current_lsn);
                                503                 :                :                 }
 3191 peter_e@gmx.net           504                 :            747 :                 SpinLockRelease(&syncworker->relmutex);
                                505                 :                : 
                                506                 :                :                 /* If we told worker to catch up, wait for it. */
 3092 tgl@sss.pgh.pa.us         507         [ +  + ]:            747 :                 if (rstate->state == SUBREL_STATE_SYNCWAIT)
                                508                 :                :                 {
                                509                 :                :                     /* Signal the sync worker, as it may be waiting for us. */
                                510         [ +  - ]:            183 :                     if (syncworker->proc)
                                511                 :            183 :                         logicalrep_worker_wakeup_ptr(syncworker);
                                512                 :                : 
                                513                 :                :                     /* Now safe to release the LWLock */
                                514                 :            183 :                     LWLockRelease(LogicalRepWorkerLock);
                                515                 :                : 
  737 akapila@postgresql.o      516         [ +  - ]:            183 :                     if (started_tx)
                                517                 :                :                     {
                                518                 :                :                         /*
                                519                 :                :                          * We must commit the existing transaction to release
                                520                 :                :                          * the existing locks before entering a busy loop.
                                521                 :                :                          * This is required to avoid any undetected deadlocks
                                522                 :                :                          * due to any existing lock as deadlock detector won't
                                523                 :                :                          * be able to detect the waits on the latch.
                                524                 :                :                          *
                                525                 :                :                          * Also close any tables prior to the commit.
                                526                 :                :                          */
  138                           527         [ +  + ]:            183 :                         if (rel)
                                528                 :                :                         {
                                529                 :             30 :                             table_close(rel, NoLock);
                                530                 :             30 :                             rel = NULL;
                                531                 :                :                         }
  737                           532                 :            183 :                         CommitTransactionCommand();
                                533                 :            183 :                         pgstat_report_stat(false);
                                534                 :                :                     }
                                535                 :                : 
                                536                 :                :                     /*
                                537                 :                :                      * Enter busy loop and wait for synchronization worker to
                                538                 :                :                      * reach expected state (or die trying).
                                539                 :                :                      */
                                540                 :            183 :                     StartTransactionCommand();
                                541                 :            183 :                     started_tx = true;
                                542                 :                : 
   62 akapila@postgresql.o      543                 :GNC         183 :                     wait_for_table_state_change(rstate->relid,
                                544                 :                :                                                 SUBREL_STATE_SYNCDONE);
                                545                 :                :                 }
                                546                 :                :                 else
 3092 tgl@sss.pgh.pa.us         547                 :CBC         564 :                     LWLockRelease(LogicalRepWorkerLock);
                                548                 :                :             }
                                549                 :                :             else
                                550                 :                :             {
                                551                 :                :                 /*
                                552                 :                :                  * If there is no sync worker for this table yet, count
                                553                 :                :                  * running sync workers for this subscription, while we have
                                554                 :                :                  * the lock.
                                555                 :                :                  */
                                556                 :                :                 int         nsyncworkers =
  943                           557                 :            894 :                     logicalrep_sync_worker_count(MyLogicalRepWorker->subid);
                                558                 :                :                 struct tablesync_start_time_mapping *hentry;
                                559                 :                :                 bool        found;
                                560                 :                : 
                                561                 :                :                 /* Now safe to release the LWLock */
 3092                           562                 :            894 :                 LWLockRelease(LogicalRepWorkerLock);
                                563                 :                : 
   42 akapila@postgresql.o      564                 :GNC         894 :                 hentry = hash_search(last_start_times, &rstate->relid,
                                565                 :                :                                      HASH_ENTER, &found);
                                566         [ +  + ]:            894 :                 if (!found)
                                567                 :            192 :                     hentry->last_start_time = 0;
                                568                 :                : 
                                569                 :            894 :                 launch_sync_worker(WORKERTYPE_TABLESYNC, nsyncworkers,
                                570                 :                :                                    rstate->relid, &hentry->last_start_time);
                                571                 :                :             }
                                572                 :                :         }
                                573                 :                :     }
                                574                 :                : 
                                575                 :                :     /* Close table if opened */
  138 akapila@postgresql.o      576         [ +  + ]:CBC        3811 :     if (rel)
                                577                 :            151 :         table_close(rel, NoLock);
                                578                 :                : 
                                579                 :                : 
 3145 peter_e@gmx.net           580         [ +  + ]:           3811 :     if (started_tx)
                                581                 :                :     {
                                582                 :                :         /*
                                583                 :                :          * Even when the two_phase mode is requested by the user, it remains
                                584                 :                :          * as 'pending' until all tablesyncs have reached READY state.
                                585                 :                :          *
                                586                 :                :          * When this happens, we restart the apply worker and (if the
                                587                 :                :          * conditions are still ok) then the two_phase tri-state will become
                                588                 :                :          * 'enabled' at that time.
                                589                 :                :          *
                                590                 :                :          * Note: If the subscription has no tables then leave the state as
                                591                 :                :          * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
                                592                 :                :          * work.
                                593                 :                :          */
 1076 tgl@sss.pgh.pa.us         594         [ +  + ]:           1057 :         if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING)
                                595                 :                :         {
                                596                 :             40 :             CommandCounterIncrement();  /* make updates visible */
                                597         [ +  + ]:             40 :             if (AllTablesyncsReady())
                                598                 :                :             {
                                599         [ +  - ]:              6 :                 ereport(LOG,
                                600                 :                :                         (errmsg("logical replication apply worker for subscription \"%s\" will restart so that two_phase can be enabled",
                                601                 :                :                                 MySubscription->name)));
                                602                 :              6 :                 should_exit = true;
                                603                 :                :             }
                                604                 :                :         }
                                605                 :                : 
 3145 peter_e@gmx.net           606                 :           1057 :         CommitTransactionCommand();
 1351 andres@anarazel.de        607                 :           1057 :         pgstat_report_stat(true);
                                608                 :                :     }
                                609                 :                : 
 1076 tgl@sss.pgh.pa.us         610         [ +  + ]:           3811 :     if (should_exit)
                                611                 :                :     {
                                612                 :                :         /*
                                613                 :                :          * Reset the last-start time for this worker so that the launcher will
                                614                 :                :          * restart it without waiting for wal_retrieve_retry_interval.
                                615                 :                :          */
 1060                           616                 :              6 :         ApplyLauncherForgetWorkerStartTime(MySubscription->oid);
                                617                 :                : 
 1076                           618                 :              6 :         proc_exit(0);
                                619                 :                :     }
 3191 peter_e@gmx.net           620                 :           3805 : }
                                621                 :                : 
                                622                 :                : /*
                                623                 :                :  * Create list of columns for COPY based on logical relation mapping.
                                624                 :                :  */
                                625                 :                : static List *
                                626                 :            195 : make_copy_attnamelist(LogicalRepRelMapEntry *rel)
                                627                 :                : {
                                628                 :            195 :     List       *attnamelist = NIL;
                                629                 :                :     int         i;
                                630                 :                : 
 3135                           631         [ +  + ]:            522 :     for (i = 0; i < rel->remoterel.natts; i++)
                                632                 :                :     {
 3191                           633                 :            327 :         attnamelist = lappend(attnamelist,
 3135                           634                 :            327 :                               makeString(rel->remoterel.attnames[i]));
                                635                 :                :     }
                                636                 :                : 
                                637                 :                : 
 3191                           638                 :            195 :     return attnamelist;
                                639                 :                : }
                                640                 :                : 
                                641                 :                : /*
                                642                 :                :  * Data source callback for the COPY FROM, which reads from the remote
                                643                 :                :  * connection and passes the data back to our local COPY.
                                644                 :                :  */
                                645                 :                : static int
                                646                 :          13999 : copy_read_data(void *outbuf, int minread, int maxread)
                                647                 :                : {
 3136 bruce@momjian.us          648                 :          13999 :     int         bytesread = 0;
                                649                 :                :     int         avail;
                                650                 :                : 
                                651                 :                :     /* If there are some leftover data from previous read, use it. */
 3191 peter_e@gmx.net           652                 :          13999 :     avail = copybuf->len - copybuf->cursor;
                                653         [ -  + ]:          13999 :     if (avail)
                                654                 :                :     {
 3191 peter_e@gmx.net           655         [ #  # ]:UBC           0 :         if (avail > maxread)
                                656                 :              0 :             avail = maxread;
                                657                 :              0 :         memcpy(outbuf, &copybuf->data[copybuf->cursor], avail);
                                658                 :              0 :         copybuf->cursor += avail;
                                659                 :              0 :         maxread -= avail;
                                660                 :              0 :         bytesread += avail;
                                661                 :                :     }
                                662                 :                : 
 3120 peter_e@gmx.net           663   [ +  -  +  - ]:CBC       13999 :     while (maxread > 0 && bytesread < minread)
                                664                 :                :     {
 3191                           665                 :          13999 :         pgsocket    fd = PGINVALID_SOCKET;
                                666                 :                :         int         len;
                                667                 :          13999 :         char       *buf = NULL;
                                668                 :                : 
                                669                 :                :         for (;;)
                                670                 :                :         {
                                671                 :                :             /* Try read the data. */
 1680 alvherre@alvh.no-ip.      672                 :          13999 :             len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
                                673                 :                : 
 3191 peter_e@gmx.net           674         [ -  + ]:          13999 :             CHECK_FOR_INTERRUPTS();
                                675                 :                : 
                                676         [ -  + ]:          13999 :             if (len == 0)
 3191 peter_e@gmx.net           677                 :UBC           0 :                 break;
 3191 peter_e@gmx.net           678         [ +  + ]:CBC       13999 :             else if (len < 0)
                                679                 :          13999 :                 return bytesread;
                                680                 :                :             else
                                681                 :                :             {
                                682                 :                :                 /* Process the data */
                                683                 :          13806 :                 copybuf->data = buf;
                                684                 :          13806 :                 copybuf->len = len;
                                685                 :          13806 :                 copybuf->cursor = 0;
                                686                 :                : 
                                687                 :          13806 :                 avail = copybuf->len - copybuf->cursor;
                                688         [ -  + ]:          13806 :                 if (avail > maxread)
 3191 peter_e@gmx.net           689                 :UBC           0 :                     avail = maxread;
 3191 peter_e@gmx.net           690                 :CBC       13806 :                 memcpy(outbuf, &copybuf->data[copybuf->cursor], avail);
  384 peter@eisentraut.org      691                 :          13806 :                 outbuf = (char *) outbuf + avail;
 3191 peter_e@gmx.net           692                 :          13806 :                 copybuf->cursor += avail;
                                693                 :          13806 :                 maxread -= avail;
                                694                 :          13806 :                 bytesread += avail;
                                695                 :                :             }
                                696                 :                : 
                                697   [ +  -  +  - ]:          13806 :             if (maxread <= 0 || bytesread >= minread)
                                698                 :          13806 :                 return bytesread;
                                699                 :                :         }
                                700                 :                : 
                                701                 :                :         /*
                                702                 :                :          * Wait for more data or latch.
                                703                 :                :          */
 2581 tmunro@postgresql.or      704                 :UBC           0 :         (void) WaitLatchOrSocket(MyLatch,
                                705                 :                :                                  WL_SOCKET_READABLE | WL_LATCH_SET |
                                706                 :                :                                  WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
                                707                 :                :                                  fd, 1000L, WAIT_EVENT_LOGICAL_SYNC_DATA);
                                708                 :                : 
 3116 andres@anarazel.de        709                 :              0 :         ResetLatch(MyLatch);
                                710                 :                :     }
                                711                 :                : 
 3191 peter_e@gmx.net           712                 :              0 :     return bytesread;
                                713                 :                : }
                                714                 :                : 
                                715                 :                : 
                                716                 :                : /*
                                717                 :                :  * Get information about remote relation in similar fashion the RELATION
                                718                 :                :  * message provides during replication.
                                719                 :                :  *
                                720                 :                :  * This function also returns (a) the relation qualifications to be used in
                                721                 :                :  * the COPY command, and (b) whether the remote relation has published any
                                722                 :                :  * generated column.
                                723                 :                :  */
                                724                 :                : static void
  413 akapila@postgresql.o      725                 :CBC         197 : fetch_remote_table_info(char *nspname, char *relname, LogicalRepRelation *lrel,
                                726                 :                :                         List **qual, bool *gencol_published)
                                727                 :                : {
                                728                 :                :     WalRcvExecResult *res;
                                729                 :                :     StringInfoData cmd;
                                730                 :                :     TupleTableSlot *slot;
 2099 peter@eisentraut.org      731                 :            197 :     Oid         tableRow[] = {OIDOID, CHAROID, CHAROID};
  413 akapila@postgresql.o      732                 :            197 :     Oid         attrRow[] = {INT2OID, TEXTOID, OIDOID, BOOLOID, BOOLOID};
 1394                           733                 :            197 :     Oid         qualRow[] = {TEXTOID};
                                734                 :                :     bool        isnull;
                                735                 :                :     int         natt;
  418 michael@paquier.xyz       736                 :            197 :     StringInfo  pub_names = NULL;
 1362 tomas.vondra@postgre      737                 :            197 :     Bitmapset  *included_cols = NULL;
  413 akapila@postgresql.o      738                 :            197 :     int         server_version = walrcv_server_version(LogRepWorkerWalRcvConn);
                                739                 :                : 
 3191 peter_e@gmx.net           740                 :            197 :     lrel->nspname = nspname;
                                741                 :            197 :     lrel->relname = relname;
                                742                 :                : 
                                743                 :                :     /* First fetch Oid and replica identity. */
                                744                 :            197 :     initStringInfo(&cmd);
 2099 peter@eisentraut.org      745                 :            197 :     appendStringInfo(&cmd, "SELECT c.oid, c.relreplident, c.relkind"
                                746                 :                :                      "  FROM pg_catalog.pg_class c"
                                747                 :                :                      "  INNER JOIN pg_catalog.pg_namespace n"
                                748                 :                :                      "        ON (c.relnamespace = n.oid)"
                                749                 :                :                      " WHERE n.nspname = %s"
                                750                 :                :                      "   AND c.relname = %s",
                                751                 :                :                      quote_literal_cstr(nspname),
                                752                 :                :                      quote_literal_cstr(relname));
 1680 alvherre@alvh.no-ip.      753                 :            197 :     res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
                                754                 :                :                       lengthof(tableRow), tableRow);
                                755                 :                : 
 3191 peter_e@gmx.net           756         [ -  + ]:            197 :     if (res->status != WALRCV_OK_TUPLES)
 3191 peter_e@gmx.net           757         [ #  # ]:UBC           0 :         ereport(ERROR,
                                758                 :                :                 (errcode(ERRCODE_CONNECTION_FAILURE),
                                759                 :                :                  errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s",
                                760                 :                :                         nspname, relname, res->err)));
                                761                 :                : 
 2589 andres@anarazel.de        762                 :CBC         197 :     slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
 3191 peter_e@gmx.net           763         [ -  + ]:            197 :     if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot))
 3191 peter_e@gmx.net           764         [ #  # ]:UBC           0 :         ereport(ERROR,
                                765                 :                :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
                                766                 :                :                  errmsg("table \"%s.%s\" not found on publisher",
                                767                 :                :                         nspname, relname)));
                                768                 :                : 
 3191 peter_e@gmx.net           769                 :CBC         197 :     lrel->remoteid = DatumGetObjectId(slot_getattr(slot, 1, &isnull));
                                770         [ -  + ]:            197 :     Assert(!isnull);
                                771                 :            197 :     lrel->replident = DatumGetChar(slot_getattr(slot, 2, &isnull));
                                772         [ -  + ]:            197 :     Assert(!isnull);
 2099 peter@eisentraut.org      773                 :            197 :     lrel->relkind = DatumGetChar(slot_getattr(slot, 3, &isnull));
                                774         [ -  + ]:            197 :     Assert(!isnull);
                                775                 :                : 
 3191 peter_e@gmx.net           776                 :            197 :     ExecDropSingleTupleTableSlot(slot);
                                777                 :            197 :     walrcv_clear_result(res);
                                778                 :                : 
                                779                 :                : 
                                780                 :                :     /*
                                781                 :                :      * Get column lists for each relation.
                                782                 :                :      *
                                783                 :                :      * We need to do this before fetching info about column names and types,
                                784                 :                :      * so that we can skip columns that should not be replicated.
                                785                 :                :      */
  413 akapila@postgresql.o      786         [ +  - ]:            197 :     if (server_version >= 150000)
                                787                 :                :     {
                                788                 :                :         WalRcvExecResult *pubres;
                                789                 :                :         TupleTableSlot *tslot;
 1294                           790                 :            197 :         Oid         attrsRow[] = {INT2VECTOROID};
                                791                 :                : 
                                792                 :                :         /* Build the pub_names comma-separated string. */
  418 michael@paquier.xyz       793                 :            197 :         pub_names = makeStringInfo();
                                794                 :            197 :         GetPublicationsStr(MySubscription->publications, pub_names, true);
                                795                 :                : 
                                796                 :                :         /*
                                797                 :                :          * Fetch info about column lists for the relation (from all the
                                798                 :                :          * publications).
                                799                 :                :          */
 1362 tomas.vondra@postgre      800                 :            197 :         resetStringInfo(&cmd);
                                801                 :            197 :         appendStringInfo(&cmd,
                                802                 :                :                          "SELECT DISTINCT"
                                803                 :                :                          "  (CASE WHEN (array_length(gpt.attrs, 1) = c.relnatts)"
                                804                 :                :                          "   THEN NULL ELSE gpt.attrs END)"
                                805                 :                :                          "  FROM pg_publication p,"
                                806                 :                :                          "  LATERAL pg_get_publication_tables(p.pubname) gpt,"
                                807                 :                :                          "  pg_class c"
                                808                 :                :                          " WHERE gpt.relid = %u AND c.oid = gpt.relid"
                                809                 :                :                          "   AND p.pubname IN ( %s )",
                                810                 :                :                          lrel->remoteid,
                                811                 :                :                          pub_names->data);
                                812                 :                : 
                                813                 :            197 :         pubres = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
                                814                 :                :                              lengthof(attrsRow), attrsRow);
                                815                 :                : 
                                816         [ -  + ]:            197 :         if (pubres->status != WALRCV_OK_TUPLES)
 1362 tomas.vondra@postgre      817         [ #  # ]:UBC           0 :             ereport(ERROR,
                                818                 :                :                     (errcode(ERRCODE_CONNECTION_FAILURE),
                                819                 :                :                      errmsg("could not fetch column list info for table \"%s.%s\" from publisher: %s",
                                820                 :                :                             nspname, relname, pubres->err)));
                                821                 :                : 
                                822                 :                :         /*
                                823                 :                :          * We don't support the case where the column list is different for
                                824                 :                :          * the same table when combining publications. See comments atop
                                825                 :                :          * fetch_relation_list. So there should be only one row returned.
                                826                 :                :          * Although we already checked this when creating the subscription, we
                                827                 :                :          * still need to check here in case the column list was changed after
                                828                 :                :          * creating the subscription and before the sync worker is started.
                                829                 :                :          */
 1294 akapila@postgresql.o      830         [ -  + ]:CBC         197 :         if (tuplestore_tuple_count(pubres->tuplestore) > 1)
 1294 akapila@postgresql.o      831         [ #  # ]:UBC           0 :             ereport(ERROR,
                                832                 :                :                     errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                                833                 :                :                     errmsg("cannot use different column lists for table \"%s.%s\" in different publications",
                                834                 :                :                            nspname, relname));
                                835                 :                : 
                                836                 :                :         /*
                                837                 :                :          * Get the column list and build a single bitmap with the attnums.
                                838                 :                :          *
                                839                 :                :          * If we find a NULL value, it means all the columns should be
                                840                 :                :          * replicated.
                                841                 :                :          */
 1215 drowley@postgresql.o      842                 :CBC         197 :         tslot = MakeSingleTupleTableSlot(pubres->tupledesc, &TTSOpsMinimalTuple);
                                843         [ +  - ]:            197 :         if (tuplestore_gettupleslot(pubres->tuplestore, true, false, tslot))
                                844                 :                :         {
                                845                 :            197 :             Datum       cfval = slot_getattr(tslot, 1, &isnull);
                                846                 :                : 
 1294 akapila@postgresql.o      847         [ +  + ]:            197 :             if (!isnull)
                                848                 :                :             {
                                849                 :                :                 ArrayType  *arr;
                                850                 :                :                 int         nelems;
                                851                 :                :                 int16      *elems;
                                852                 :                : 
                                853                 :             22 :                 arr = DatumGetArrayTypeP(cfval);
                                854                 :             22 :                 nelems = ARR_DIMS(arr)[0];
                                855         [ -  + ]:             22 :                 elems = (int16 *) ARR_DATA_PTR(arr);
                                856                 :                : 
                                857         [ +  + ]:             59 :                 for (natt = 0; natt < nelems; natt++)
                                858                 :             37 :                     included_cols = bms_add_member(included_cols, elems[natt]);
                                859                 :                :             }
                                860                 :                : 
 1215 drowley@postgresql.o      861                 :            197 :             ExecClearTuple(tslot);
                                862                 :                :         }
                                863                 :            197 :         ExecDropSingleTupleTableSlot(tslot);
                                864                 :                : 
 1362 tomas.vondra@postgre      865                 :            197 :         walrcv_clear_result(pubres);
                                866                 :                :     }
                                867                 :                : 
                                868                 :                :     /*
                                869                 :                :      * Now fetch column names and types.
                                870                 :                :      */
 3191 peter_e@gmx.net           871                 :            197 :     resetStringInfo(&cmd);
  250 drowley@postgresql.o      872                 :            197 :     appendStringInfoString(&cmd,
                                873                 :                :                            "SELECT a.attnum,"
                                874                 :                :                            "       a.attname,"
                                875                 :                :                            "       a.atttypid,"
                                876                 :                :                            "       a.attnum = ANY(i.indkey)");
                                877                 :                : 
                                878                 :                :     /* Generated columns can be replicated since version 18. */
  413 akapila@postgresql.o      879         [ +  - ]:            197 :     if (server_version >= 180000)
  250 drowley@postgresql.o      880                 :            197 :         appendStringInfoString(&cmd, ", a.attgenerated != ''");
                                881                 :                : 
  413 akapila@postgresql.o      882         [ +  - ]:            394 :     appendStringInfo(&cmd,
                                883                 :                :                      "  FROM pg_catalog.pg_attribute a"
                                884                 :                :                      "  LEFT JOIN pg_catalog.pg_index i"
                                885                 :                :                      "       ON (i.indexrelid = pg_get_replica_identity_index(%u))"
                                886                 :                :                      " WHERE a.attnum > 0::pg_catalog.int2"
                                887                 :                :                      "   AND NOT a.attisdropped %s"
                                888                 :                :                      "   AND a.attrelid = %u"
                                889                 :                :                      " ORDER BY a.attnum",
                                890                 :                :                      lrel->remoteid,
                                891         [ -  + ]:            197 :                      (server_version >= 120000 && server_version < 180000 ?
                                892                 :                :                       "AND a.attgenerated = ''" : ""),
                                893                 :                :                      lrel->remoteid);
 1680 alvherre@alvh.no-ip.      894         [ +  - ]:            197 :     res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
                                895                 :                :                       server_version >= 180000 ? lengthof(attrRow) : lengthof(attrRow) - 1, attrRow);
                                896                 :                : 
 3191 peter_e@gmx.net           897         [ -  + ]:            197 :     if (res->status != WALRCV_OK_TUPLES)
 3191 peter_e@gmx.net           898         [ #  # ]:UBC           0 :         ereport(ERROR,
                                899                 :                :                 (errcode(ERRCODE_CONNECTION_FAILURE),
                                900                 :                :                  errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s",
                                901                 :                :                         nspname, relname, res->err)));
                                902                 :                : 
                                903                 :                :     /* We don't know the number of rows coming, so allocate enough space. */
 3191 peter_e@gmx.net           904                 :CBC         197 :     lrel->attnames = palloc0(MaxTupleAttributeNumber * sizeof(char *));
                                905                 :            197 :     lrel->atttyps = palloc0(MaxTupleAttributeNumber * sizeof(Oid));
                                906                 :            197 :     lrel->attkeys = NULL;
                                907                 :                : 
                                908                 :                :     /*
                                909                 :                :      * Store the columns as a list of names.  Ignore those that are not
                                910                 :                :      * present in the column list, if there is one.
                                911                 :                :      */
                                912                 :            197 :     natt = 0;
 2589 andres@anarazel.de        913                 :            197 :     slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
 3191 peter_e@gmx.net           914         [ +  + ]:            561 :     while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
                                915                 :                :     {
                                916                 :                :         char       *rel_colname;
                                917                 :                :         AttrNumber  attnum;
                                918                 :                : 
 1362 tomas.vondra@postgre      919                 :            364 :         attnum = DatumGetInt16(slot_getattr(slot, 1, &isnull));
                                920         [ -  + ]:            364 :         Assert(!isnull);
                                921                 :                : 
                                922                 :                :         /* If the column is not in the column list, skip it. */
                                923   [ +  +  +  + ]:            364 :         if (included_cols != NULL && !bms_is_member(attnum, included_cols))
                                924                 :                :         {
                                925                 :             31 :             ExecClearTuple(slot);
                                926                 :             31 :             continue;
                                927                 :                :         }
                                928                 :                : 
                                929                 :            333 :         rel_colname = TextDatumGetCString(slot_getattr(slot, 2, &isnull));
 3191 peter_e@gmx.net           930         [ -  + ]:            333 :         Assert(!isnull);
                                931                 :                : 
 1362 tomas.vondra@postgre      932                 :            333 :         lrel->attnames[natt] = rel_colname;
                                933                 :            333 :         lrel->atttyps[natt] = DatumGetObjectId(slot_getattr(slot, 3, &isnull));
 3191 peter_e@gmx.net           934         [ -  + ]:            333 :         Assert(!isnull);
                                935                 :                : 
 1362 tomas.vondra@postgre      936         [ +  + ]:            333 :         if (DatumGetBool(slot_getattr(slot, 4, &isnull)))
 3191 peter_e@gmx.net           937                 :            109 :             lrel->attkeys = bms_add_member(lrel->attkeys, natt);
                                938                 :                : 
                                939                 :                :         /* Remember if the remote table has published any generated column. */
  413 akapila@postgresql.o      940   [ +  -  +  - ]:            333 :         if (server_version >= 180000 && !(*gencol_published))
                                941                 :                :         {
                                942                 :            333 :             *gencol_published = DatumGetBool(slot_getattr(slot, 5, &isnull));
                                943         [ -  + ]:            333 :             Assert(!isnull);
                                944                 :                :         }
                                945                 :                : 
                                946                 :                :         /* Should never happen. */
 3191 peter_e@gmx.net           947         [ -  + ]:            333 :         if (++natt >= MaxTupleAttributeNumber)
 3191 peter_e@gmx.net           948         [ #  # ]:UBC           0 :             elog(ERROR, "too many columns in remote table \"%s.%s\"",
                                949                 :                :                  nspname, relname);
                                950                 :                : 
 3191 peter_e@gmx.net           951                 :CBC         333 :         ExecClearTuple(slot);
                                952                 :                :     }
                                953                 :            197 :     ExecDropSingleTupleTableSlot(slot);
                                954                 :                : 
                                955                 :            197 :     lrel->natts = natt;
                                956                 :                : 
                                957                 :            197 :     walrcv_clear_result(res);
                                958                 :                : 
                                959                 :                :     /*
                                960                 :                :      * Get relation's row filter expressions. DISTINCT avoids the same
                                961                 :                :      * expression of a table in multiple publications from being included
                                962                 :                :      * multiple times in the final expression.
                                963                 :                :      *
                                964                 :                :      * We need to copy the row even if it matches just one of the
                                965                 :                :      * publications, so we later combine all the quals with OR.
                                966                 :                :      *
                                967                 :                :      * For initial synchronization, row filtering can be ignored in following
                                968                 :                :      * cases:
                                969                 :                :      *
                                970                 :                :      * 1) one of the subscribed publications for the table hasn't specified
                                971                 :                :      * any row filter
                                972                 :                :      *
                                973                 :                :      * 2) one of the subscribed publications has puballtables set to true
                                974                 :                :      *
                                975                 :                :      * 3) one of the subscribed publications is declared as TABLES IN SCHEMA
                                976                 :                :      * that includes this relation
                                977                 :                :      */
  413 akapila@postgresql.o      978         [ +  - ]:            197 :     if (server_version >= 150000)
                                979                 :                :     {
                                980                 :                :         /* Reuse the already-built pub_names. */
  418 michael@paquier.xyz       981         [ -  + ]:            197 :         Assert(pub_names != NULL);
                                982                 :                : 
                                983                 :                :         /* Check for row filters. */
 1394 akapila@postgresql.o      984                 :            197 :         resetStringInfo(&cmd);
                                985                 :            197 :         appendStringInfo(&cmd,
                                986                 :                :                          "SELECT DISTINCT pg_get_expr(gpt.qual, gpt.relid)"
                                987                 :                :                          "  FROM pg_publication p,"
                                988                 :                :                          "  LATERAL pg_get_publication_tables(p.pubname) gpt"
                                989                 :                :                          " WHERE gpt.relid = %u"
                                990                 :                :                          "   AND p.pubname IN ( %s )",
                                991                 :                :                          lrel->remoteid,
                                992                 :                :                          pub_names->data);
                                993                 :                : 
                                994                 :            197 :         res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 1, qualRow);
                                995                 :                : 
                                996         [ -  + ]:            197 :         if (res->status != WALRCV_OK_TUPLES)
 1394 akapila@postgresql.o      997         [ #  # ]:UBC           0 :             ereport(ERROR,
                                998                 :                :                     (errmsg("could not fetch table WHERE clause info for table \"%s.%s\" from publisher: %s",
                                999                 :                :                             nspname, relname, res->err)));
                               1000                 :                : 
                               1001                 :                :         /*
                               1002                 :                :          * Multiple row filter expressions for the same table will be combined
                               1003                 :                :          * by COPY using OR. If any of the filter expressions for this table
                               1004                 :                :          * are null, it means the whole table will be copied. In this case it
                               1005                 :                :          * is not necessary to construct a unified row filter expression at
                               1006                 :                :          * all.
                               1007                 :                :          */
 1394 akapila@postgresql.o     1008                 :CBC         197 :         slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
                               1009         [ +  + ]:            212 :         while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
                               1010                 :                :         {
                               1011                 :            201 :             Datum       rf = slot_getattr(slot, 1, &isnull);
                               1012                 :                : 
                               1013         [ +  + ]:            201 :             if (!isnull)
                               1014                 :             15 :                 *qual = lappend(*qual, makeString(TextDatumGetCString(rf)));
                               1015                 :                :             else
                               1016                 :                :             {
                               1017                 :                :                 /* Ignore filters and cleanup as necessary. */
                               1018         [ +  + ]:            186 :                 if (*qual)
                               1019                 :                :                 {
                               1020                 :              3 :                     list_free_deep(*qual);
                               1021                 :              3 :                     *qual = NIL;
                               1022                 :                :                 }
                               1023                 :            186 :                 break;
                               1024                 :                :             }
                               1025                 :                : 
                               1026                 :             15 :             ExecClearTuple(slot);
                               1027                 :                :         }
                               1028                 :            197 :         ExecDropSingleTupleTableSlot(slot);
                               1029                 :                : 
                               1030                 :            197 :         walrcv_clear_result(res);
  418 michael@paquier.xyz      1031                 :            197 :         destroyStringInfo(pub_names);
                               1032                 :                :     }
                               1033                 :                : 
 3191 peter_e@gmx.net          1034                 :            197 :     pfree(cmd.data);
                               1035                 :            197 : }
                               1036                 :                : 
                               1037                 :                : /*
                               1038                 :                :  * Copy existing data of a table from publisher.
                               1039                 :                :  *
                               1040                 :                :  * Caller is responsible for locking the local relation.
                               1041                 :                :  */
                               1042                 :                : static void
                               1043                 :            197 : copy_table(Relation rel)
                               1044                 :                : {
                               1045                 :                :     LogicalRepRelMapEntry *relmapentry;
                               1046                 :                :     LogicalRepRelation lrel;
 1394 akapila@postgresql.o     1047                 :            197 :     List       *qual = NIL;
                               1048                 :                :     WalRcvExecResult *res;
                               1049                 :                :     StringInfoData cmd;
                               1050                 :                :     CopyFromState cstate;
                               1051                 :                :     List       *attnamelist;
                               1052                 :                :     ParseState *pstate;
 1000                          1053                 :            197 :     List       *options = NIL;
  413                          1054                 :            197 :     bool        gencol_published = false;
                               1055                 :                : 
                               1056                 :                :     /* Get the publisher relation info. */
 3191 peter_e@gmx.net          1057                 :            197 :     fetch_remote_table_info(get_namespace_name(RelationGetNamespace(rel)),
  413 akapila@postgresql.o     1058                 :            197 :                             RelationGetRelationName(rel), &lrel, &qual,
                               1059                 :                :                             &gencol_published);
                               1060                 :                : 
                               1061                 :                :     /* Put the relation into relmap. */
 3191 peter_e@gmx.net          1062                 :            197 :     logicalrep_relmap_update(&lrel);
                               1063                 :                : 
                               1064                 :                :     /* Map the publisher relation to local one. */
                               1065                 :            197 :     relmapentry = logicalrep_rel_open(lrel.remoteid, NoLock);
                               1066         [ -  + ]:            195 :     Assert(rel == relmapentry->localrel);
                               1067                 :                : 
                               1068                 :                :     /* Start copy on the publisher. */
                               1069                 :            195 :     initStringInfo(&cmd);
                               1070                 :                : 
                               1071                 :                :     /* Regular or partitioned table with no row filter or generated columns */
   27 msawada@postgresql.o     1072   [ +  +  +  - ]:GNC         195 :     if ((lrel.relkind == RELKIND_RELATION || lrel.relkind == RELKIND_PARTITIONED_TABLE)
                               1073   [ +  +  +  + ]:            195 :         && qual == NIL && !gencol_published)
                               1074                 :                :     {
  756 akapila@postgresql.o     1075                 :CBC         181 :         appendStringInfo(&cmd, "COPY %s",
 2099 peter@eisentraut.org     1076                 :            181 :                          quote_qualified_identifier(lrel.nspname, lrel.relname));
                               1077                 :                : 
                               1078                 :                :         /* If the table has columns, then specify the columns */
  756 akapila@postgresql.o     1079         [ +  + ]:            181 :         if (lrel.natts)
                               1080                 :                :         {
                               1081                 :            180 :             appendStringInfoString(&cmd, " (");
                               1082                 :                : 
                               1083                 :                :             /*
                               1084                 :                :              * XXX Do we need to list the columns in all cases? Maybe we're
                               1085                 :                :              * replicating all columns?
                               1086                 :                :              */
                               1087         [ +  + ]:            486 :             for (int i = 0; i < lrel.natts; i++)
                               1088                 :                :             {
                               1089         [ +  + ]:            306 :                 if (i > 0)
                               1090                 :            126 :                     appendStringInfoString(&cmd, ", ");
                               1091                 :                : 
                               1092                 :            306 :                 appendStringInfoString(&cmd, quote_identifier(lrel.attnames[i]));
                               1093                 :                :             }
                               1094                 :                : 
  616 drowley@postgresql.o     1095                 :            180 :             appendStringInfoChar(&cmd, ')');
                               1096                 :                :         }
                               1097                 :                : 
  756 akapila@postgresql.o     1098                 :            181 :         appendStringInfoString(&cmd, " TO STDOUT");
                               1099                 :                :     }
                               1100                 :                :     else
                               1101                 :                :     {
                               1102                 :                :         /*
                               1103                 :                :          * For non-tables and tables with row filters, we need to do COPY
                               1104                 :                :          * (SELECT ...), but we can't just do SELECT * because we may need to
                               1105                 :                :          * copy only subset of columns including generated columns. For tables
                               1106                 :                :          * with any row filters, build a SELECT query with OR'ed row filters
                               1107                 :                :          * for COPY.
                               1108                 :                :          *
                               1109                 :                :          * We also need to use this same COPY (SELECT ...) syntax when
                               1110                 :                :          * generated columns are published, because copy of generated columns
                               1111                 :                :          * is not supported by the normal COPY.
                               1112                 :                :          */
 1889 drowley@postgresql.o     1113                 :             14 :         appendStringInfoString(&cmd, "COPY (SELECT ");
 2099 peter@eisentraut.org     1114         [ +  + ]:             35 :         for (int i = 0; i < lrel.natts; i++)
                               1115                 :                :         {
                               1116                 :             21 :             appendStringInfoString(&cmd, quote_identifier(lrel.attnames[i]));
                               1117         [ +  + ]:             21 :             if (i < lrel.natts - 1)
                               1118                 :              7 :                 appendStringInfoString(&cmd, ", ");
                               1119                 :                :         }
                               1120                 :                : 
 1394 akapila@postgresql.o     1121                 :             14 :         appendStringInfoString(&cmd, " FROM ");
                               1122                 :                : 
                               1123                 :                :         /*
                               1124                 :                :          * For regular tables, make sure we don't copy data from a child that
                               1125                 :                :          * inherits the named table as those will be copied separately.
                               1126                 :                :          */
                               1127         [ +  + ]:             14 :         if (lrel.relkind == RELKIND_RELATION)
                               1128                 :             11 :             appendStringInfoString(&cmd, "ONLY ");
                               1129                 :                : 
                               1130                 :             14 :         appendStringInfoString(&cmd, quote_qualified_identifier(lrel.nspname, lrel.relname));
                               1131                 :                :         /* list of OR'ed filters */
                               1132         [ +  + ]:             14 :         if (qual != NIL)
                               1133                 :                :         {
                               1134                 :                :             ListCell   *lc;
                               1135                 :             11 :             char       *q = strVal(linitial(qual));
                               1136                 :                : 
                               1137                 :             11 :             appendStringInfo(&cmd, " WHERE %s", q);
                               1138   [ +  -  +  +  :             12 :             for_each_from(lc, qual, 1)
                                              +  + ]
                               1139                 :                :             {
                               1140                 :              1 :                 q = strVal(lfirst(lc));
                               1141                 :              1 :                 appendStringInfo(&cmd, " OR %s", q);
                               1142                 :                :             }
                               1143                 :             11 :             list_free_deep(qual);
                               1144                 :                :         }
                               1145                 :                : 
                               1146                 :             14 :         appendStringInfoString(&cmd, ") TO STDOUT");
                               1147                 :                :     }
                               1148                 :                : 
                               1149                 :                :     /*
                               1150                 :                :      * Prior to v16, initial table synchronization will use text format even
                               1151                 :                :      * if the binary option is enabled for a subscription.
                               1152                 :                :      */
 1000                          1153         [ +  - ]:            195 :     if (walrcv_server_version(LogRepWorkerWalRcvConn) >= 160000 &&
                               1154         [ +  + ]:            195 :         MySubscription->binary)
                               1155                 :                :     {
                               1156                 :              5 :         appendStringInfoString(&cmd, " WITH (FORMAT binary)");
                               1157                 :              5 :         options = list_make1(makeDefElem("format",
                               1158                 :                :                                          (Node *) makeString("binary"), -1));
                               1159                 :                :     }
                               1160                 :                : 
 1680 alvherre@alvh.no-ip.     1161                 :            195 :     res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 0, NULL);
 3191 peter_e@gmx.net          1162                 :            195 :     pfree(cmd.data);
                               1163         [ -  + ]:            195 :     if (res->status != WALRCV_OK_COPY_OUT)
 3191 peter_e@gmx.net          1164         [ #  # ]:UBC           0 :         ereport(ERROR,
                               1165                 :                :                 (errcode(ERRCODE_CONNECTION_FAILURE),
                               1166                 :                :                  errmsg("could not start initial contents copy for table \"%s.%s\": %s",
                               1167                 :                :                         lrel.nspname, lrel.relname, res->err)));
 3191 peter_e@gmx.net          1168                 :CBC         195 :     walrcv_clear_result(res);
                               1169                 :                : 
                               1170                 :            195 :     copybuf = makeStringInfo();
                               1171                 :                : 
 3166                          1172                 :            195 :     pstate = make_parsestate(NULL);
 2176 tgl@sss.pgh.pa.us        1173                 :            195 :     (void) addRangeTableEntryForRelation(pstate, rel, AccessShareLock,
                               1174                 :                :                                          NULL, false, false);
                               1175                 :                : 
 3191 peter_e@gmx.net          1176                 :            195 :     attnamelist = make_copy_attnamelist(relmapentry);
 1000 akapila@postgresql.o     1177                 :            195 :     cstate = BeginCopyFrom(pstate, rel, NULL, NULL, false, copy_read_data, attnamelist, options);
                               1178                 :                : 
                               1179                 :                :     /* Do the copy */
 3191 peter_e@gmx.net          1180                 :            194 :     (void) CopyFrom(cstate);
                               1181                 :                : 
                               1182                 :            185 :     logicalrep_rel_close(relmapentry, NoLock);
                               1183                 :            185 : }
                               1184                 :                : 
                               1185                 :                : /*
                               1186                 :                :  * Determine the tablesync slot name.
                               1187                 :                :  *
                               1188                 :                :  * The name must not exceed NAMEDATALEN - 1 because of remote node constraints
                               1189                 :                :  * on slot name length. We append system_identifier to avoid slot_name
                               1190                 :                :  * collision with subscriptions in other clusters. With the current scheme
                               1191                 :                :  * pg_%u_sync_%u_UINT64_FORMAT (3 + 10 + 6 + 10 + 20 + '\0'), the maximum
                               1192                 :                :  * length of slot_name will be 50.
                               1193                 :                :  *
                               1194                 :                :  * The returned slot name is stored in the supplied buffer (syncslotname) with
                               1195                 :                :  * the given size.
                               1196                 :                :  *
                               1197                 :                :  * Note: We don't use the subscription slot name as part of tablesync slot name
                               1198                 :                :  * because we are responsible for cleaning up these slots and it could become
                               1199                 :                :  * impossible to recalculate what name to cleanup if the subscription slot name
                               1200                 :                :  * had changed.
                               1201                 :                :  */
                               1202                 :                : void
 1769 akapila@postgresql.o     1203                 :            389 : ReplicationSlotNameForTablesync(Oid suboid, Oid relid,
                               1204                 :                :                                 char *syncslotname, Size szslot)
                               1205                 :                : {
 1766                          1206                 :            389 :     snprintf(syncslotname, szslot, "pg_%u_sync_%u_" UINT64_FORMAT, suboid,
                               1207                 :                :              relid, GetSystemIdentifier());
 1769                          1208                 :            389 : }
                               1209                 :                : 
                               1210                 :                : /*
                               1211                 :                :  * Start syncing the table in the sync worker.
                               1212                 :                :  *
                               1213                 :                :  * If nothing needs to be done to sync the table, we exit the worker without
                               1214                 :                :  * any further action.
                               1215                 :                :  *
                               1216                 :                :  * The returned slot name is palloc'ed in current memory context.
                               1217                 :                :  */
                               1218                 :                : static char *
 3191 peter_e@gmx.net          1219                 :            199 : LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
                               1220                 :                : {
                               1221                 :                :     char       *slotname;
                               1222                 :                :     char       *err;
                               1223                 :                :     char        relstate;
                               1224                 :                :     XLogRecPtr  relstate_lsn;
                               1225                 :                :     Relation    rel;
                               1226                 :                :     AclResult   aclresult;
                               1227                 :                :     WalRcvExecResult *res;
                               1228                 :                :     char        originname[NAMEDATALEN];
                               1229                 :                :     RepOriginId originid;
                               1230                 :                :     UserContext ucxt;
                               1231                 :                :     bool        must_use_password;
                               1232                 :                :     bool        run_as_owner;
                               1233                 :                : 
                               1234                 :                :     /* Check the state of the table synchronization. */
                               1235                 :            199 :     StartTransactionCommand();
 3163 fujii@postgresql.org     1236                 :            199 :     relstate = GetSubscriptionRelState(MyLogicalRepWorker->subid,
                               1237                 :            199 :                                        MyLogicalRepWorker->relid,
                               1238                 :                :                                        &relstate_lsn);
  792 akapila@postgresql.o     1239                 :            199 :     CommitTransactionCommand();
                               1240                 :                : 
                               1241                 :                :     /* Is the use of a password mandatory? */
  916                          1242         [ +  + ]:            394 :     must_use_password = MySubscription->passwordrequired &&
  792                          1243         [ -  + ]:            195 :         !MySubscription->ownersuperuser;
                               1244                 :                : 
 3191 peter_e@gmx.net          1245         [ -  + ]:            199 :     SpinLockAcquire(&MyLogicalRepWorker->relmutex);
 3163 fujii@postgresql.org     1246                 :            199 :     MyLogicalRepWorker->relstate = relstate;
                               1247                 :            199 :     MyLogicalRepWorker->relstate_lsn = relstate_lsn;
 3191 peter_e@gmx.net          1248                 :            199 :     SpinLockRelease(&MyLogicalRepWorker->relmutex);
                               1249                 :                : 
                               1250                 :                :     /*
                               1251                 :                :      * If synchronization is already done or no longer necessary, exit now
                               1252                 :                :      * that we've updated shared memory state.
                               1253                 :                :      */
 1889 alvherre@alvh.no-ip.     1254         [ -  + ]:            199 :     switch (relstate)
                               1255                 :                :     {
 1889 alvherre@alvh.no-ip.     1256                 :UBC           0 :         case SUBREL_STATE_SYNCDONE:
                               1257                 :                :         case SUBREL_STATE_READY:
                               1258                 :                :         case SUBREL_STATE_UNKNOWN:
   62 akapila@postgresql.o     1259                 :UNC           0 :             FinishSyncWorker(); /* doesn't return */
                               1260                 :                :     }
                               1261                 :                : 
                               1262                 :                :     /* Calculate the name of the tablesync slot. */
 1766 akapila@postgresql.o     1263                 :CBC         199 :     slotname = (char *) palloc(NAMEDATALEN);
                               1264                 :            199 :     ReplicationSlotNameForTablesync(MySubscription->oid,
                               1265                 :            199 :                                     MyLogicalRepWorker->relid,
                               1266                 :                :                                     slotname,
                               1267                 :                :                                     NAMEDATALEN);
                               1268                 :                : 
                               1269                 :                :     /*
                               1270                 :                :      * Here we use the slot name instead of the subscription name as the
                               1271                 :                :      * application_name, so that it is different from the leader apply worker,
                               1272                 :                :      * so that synchronous replication can distinguish them.
                               1273                 :                :      */
 1680 alvherre@alvh.no-ip.     1274                 :            199 :     LogRepWorkerWalRcvConn =
  681 akapila@postgresql.o     1275                 :            199 :         walrcv_connect(MySubscription->conninfo, true, true,
                               1276                 :                :                        must_use_password,
                               1277                 :                :                        slotname, &err);
 1680 alvherre@alvh.no-ip.     1278         [ -  + ]:            199 :     if (LogRepWorkerWalRcvConn == NULL)
 3191 peter_e@gmx.net          1279         [ #  # ]:UBC           0 :         ereport(ERROR,
                               1280                 :                :                 (errcode(ERRCODE_CONNECTION_FAILURE),
                               1281                 :                :                  errmsg("table synchronization worker for subscription \"%s\" could not connect to the publisher: %s",
                               1282                 :                :                         MySubscription->name, err)));
                               1283                 :                : 
 1889 alvherre@alvh.no-ip.     1284   [ +  +  -  +  :CBC         199 :     Assert(MyLogicalRepWorker->relstate == SUBREL_STATE_INIT ||
                                              -  - ]
                               1285                 :                :            MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC ||
                               1286                 :                :            MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY);
                               1287                 :                : 
                               1288                 :                :     /* Assign the origin tracking record name. */
 1163 akapila@postgresql.o     1289                 :            199 :     ReplicationOriginNameForLogicalRep(MySubscription->oid,
                               1290                 :            199 :                                        MyLogicalRepWorker->relid,
                               1291                 :                :                                        originname,
                               1292                 :                :                                        sizeof(originname));
                               1293                 :                : 
 1769                          1294         [ +  + ]:            199 :     if (MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC)
                               1295                 :                :     {
                               1296                 :                :         /*
                               1297                 :                :          * We have previously errored out before finishing the copy so the
                               1298                 :                :          * replication slot might exist. We want to remove the slot if it
                               1299                 :                :          * already exists and proceed.
                               1300                 :                :          *
                               1301                 :                :          * XXX We could also instead try to drop the slot, last time we failed
                               1302                 :                :          * but for that, we might need to clean up the copy state as it might
                               1303                 :                :          * be in the middle of fetching the rows. Also, if there is a network
                               1304                 :                :          * breakdown then it wouldn't have succeeded so trying it next time
                               1305                 :                :          * seems like a better bet.
                               1306                 :                :          */
 1680 alvherre@alvh.no-ip.     1307                 :              9 :         ReplicationSlotDropAtPubNode(LogRepWorkerWalRcvConn, slotname, true);
                               1308                 :                :     }
 1769 akapila@postgresql.o     1309         [ -  + ]:            190 :     else if (MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY)
                               1310                 :                :     {
                               1311                 :                :         /*
                               1312                 :                :          * The COPY phase was previously done, but tablesync then crashed
                               1313                 :                :          * before it was able to finish normally.
                               1314                 :                :          */
 1769 akapila@postgresql.o     1315                 :UBC           0 :         StartTransactionCommand();
                               1316                 :                : 
                               1317                 :                :         /*
                               1318                 :                :          * The origin tracking name must already exist. It was created first
                               1319                 :                :          * time this tablesync was launched.
                               1320                 :                :          */
                               1321                 :              0 :         originid = replorigin_by_name(originname, false);
 1073                          1322                 :              0 :         replorigin_session_setup(originid, 0);
 1769                          1323                 :              0 :         replorigin_session_origin = originid;
                               1324                 :              0 :         *origin_startpos = replorigin_session_get_progress(false);
                               1325                 :                : 
                               1326                 :              0 :         CommitTransactionCommand();
                               1327                 :                : 
                               1328                 :              0 :         goto copy_table_done;
                               1329                 :                :     }
                               1330                 :                : 
 1889 alvherre@alvh.no-ip.     1331         [ -  + ]:CBC         199 :     SpinLockAcquire(&MyLogicalRepWorker->relmutex);
                               1332                 :            199 :     MyLogicalRepWorker->relstate = SUBREL_STATE_DATASYNC;
                               1333                 :            199 :     MyLogicalRepWorker->relstate_lsn = InvalidXLogRecPtr;
                               1334                 :            199 :     SpinLockRelease(&MyLogicalRepWorker->relmutex);
                               1335                 :                : 
                               1336                 :                :     /* Update the state and make it visible to others. */
                               1337                 :            199 :     StartTransactionCommand();
                               1338                 :            199 :     UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
                               1339                 :            199 :                                MyLogicalRepWorker->relid,
                               1340                 :            199 :                                MyLogicalRepWorker->relstate,
  138 akapila@postgresql.o     1341                 :            199 :                                MyLogicalRepWorker->relstate_lsn,
                               1342                 :                :                                false);
 1889 alvherre@alvh.no-ip.     1343                 :            197 :     CommitTransactionCommand();
 1351 andres@anarazel.de       1344                 :            197 :     pgstat_report_stat(true);
                               1345                 :                : 
 1889 alvherre@alvh.no-ip.     1346                 :            197 :     StartTransactionCommand();
                               1347                 :                : 
                               1348                 :                :     /*
                               1349                 :                :      * Use a standard write lock here. It might be better to disallow access
                               1350                 :                :      * to the table while it's being synchronized. But we don't want to block
                               1351                 :                :      * the main apply process from working and it has to open the relation in
                               1352                 :                :      * RowExclusiveLock when remapping remote relation id to local one.
                               1353                 :                :      */
                               1354                 :            197 :     rel = table_open(MyLogicalRepWorker->relid, RowExclusiveLock);
                               1355                 :                : 
                               1356                 :                :     /*
                               1357                 :                :      * Start a transaction in the remote node in REPEATABLE READ mode.  This
                               1358                 :                :      * ensures that both the replication slot we create (see below) and the
                               1359                 :                :      * COPY are consistent with each other.
                               1360                 :                :      */
 1680                          1361                 :            197 :     res = walrcv_exec(LogRepWorkerWalRcvConn,
                               1362                 :                :                       "BEGIN READ ONLY ISOLATION LEVEL REPEATABLE READ",
                               1363                 :                :                       0, NULL);
 1889                          1364         [ -  + ]:            197 :     if (res->status != WALRCV_OK_COMMAND)
 1889 alvherre@alvh.no-ip.     1365         [ #  # ]:UBC           0 :         ereport(ERROR,
                               1366                 :                :                 (errcode(ERRCODE_CONNECTION_FAILURE),
                               1367                 :                :                  errmsg("table copy could not start transaction on publisher: %s",
                               1368                 :                :                         res->err)));
 1889 alvherre@alvh.no-ip.     1369                 :CBC         197 :     walrcv_clear_result(res);
                               1370                 :                : 
                               1371                 :                :     /*
                               1372                 :                :      * Create a new permanent logical decoding slot. This slot will be used
                               1373                 :                :      * for the catchup phase after COPY is done, so tell it to use the
                               1374                 :                :      * snapshot to make the final data consistent.
                               1375                 :                :      */
 1617 akapila@postgresql.o     1376                 :            197 :     walrcv_create_slot(LogRepWorkerWalRcvConn,
                               1377                 :                :                        slotname, false /* permanent */ , false /* two_phase */ ,
                               1378                 :                :                        MySubscription->failover,
                               1379                 :                :                        CRS_USE_SNAPSHOT, origin_startpos);
                               1380                 :                : 
                               1381                 :                :     /*
                               1382                 :                :      * Setup replication origin tracking. The purpose of doing this before the
                               1383                 :                :      * copy is to avoid doing the copy again due to any error in setting up
                               1384                 :                :      * origin tracking.
                               1385                 :                :      */
 1769                          1386                 :            197 :     originid = replorigin_by_name(originname, true);
                               1387         [ +  - ]:            197 :     if (!OidIsValid(originid))
                               1388                 :                :     {
                               1389                 :                :         /*
                               1390                 :                :          * Origin tracking does not exist, so create it now.
                               1391                 :                :          *
                               1392                 :                :          * Then advance to the LSN got from walrcv_create_slot. This is WAL
                               1393                 :                :          * logged for the purpose of recovery. Locks are to prevent the
                               1394                 :                :          * replication origin from vanishing while advancing.
                               1395                 :                :          */
                               1396                 :            197 :         originid = replorigin_create(originname);
                               1397                 :                : 
                               1398                 :            197 :         LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
                               1399                 :            197 :         replorigin_advance(originid, *origin_startpos, InvalidXLogRecPtr,
                               1400                 :                :                            true /* go backward */ , true /* WAL log */ );
                               1401                 :            197 :         UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
                               1402                 :                : 
 1073                          1403                 :            197 :         replorigin_session_setup(originid, 0);
 1769                          1404                 :            197 :         replorigin_session_origin = originid;
                               1405                 :                :     }
                               1406                 :                :     else
                               1407                 :                :     {
 1769 akapila@postgresql.o     1408         [ #  # ]:UBC           0 :         ereport(ERROR,
                               1409                 :                :                 (errcode(ERRCODE_DUPLICATE_OBJECT),
                               1410                 :                :                  errmsg("replication origin \"%s\" already exists",
                               1411                 :                :                         originname)));
                               1412                 :                :     }
                               1413                 :                : 
                               1414                 :                :     /*
                               1415                 :                :      * If the user did not opt to run as the owner of the subscription
                               1416                 :                :      * ('run_as_owner'), then copy the table as the owner of the table.
                               1417                 :                :      */
  922 msawada@postgresql.o     1418                 :CBC         197 :     run_as_owner = MySubscription->runasowner;
                               1419         [ +  + ]:            197 :     if (!run_as_owner)
                               1420                 :            196 :         SwitchToUntrustedUser(rel->rd_rel->relowner, &ucxt);
                               1421                 :                : 
                               1422                 :                :     /*
                               1423                 :                :      * Check that our table sync worker has permission to insert into the
                               1424                 :                :      * target table.
                               1425                 :                :      */
                               1426                 :            197 :     aclresult = pg_class_aclcheck(RelationGetRelid(rel), GetUserId(),
                               1427                 :                :                                   ACL_INSERT);
                               1428         [ -  + ]:            197 :     if (aclresult != ACLCHECK_OK)
  922 msawada@postgresql.o     1429                 :UBC           0 :         aclcheck_error(aclresult,
                               1430                 :              0 :                        get_relkind_objtype(rel->rd_rel->relkind),
                               1431                 :              0 :                        RelationGetRelationName(rel));
                               1432                 :                : 
                               1433                 :                :     /*
                               1434                 :                :      * COPY FROM does not honor RLS policies.  That is not a problem for
                               1435                 :                :      * subscriptions owned by roles with BYPASSRLS privilege (or superuser,
                               1436                 :                :      * who has it implicitly), but other roles should not be able to
                               1437                 :                :      * circumvent RLS.  Disallow logical replication into RLS enabled
                               1438                 :                :      * relations for such roles.
                               1439                 :                :      */
  922 msawada@postgresql.o     1440         [ -  + ]:CBC         197 :     if (check_enable_rls(RelationGetRelid(rel), InvalidOid, false) == RLS_ENABLED)
  922 msawada@postgresql.o     1441         [ #  # ]:UBC           0 :         ereport(ERROR,
                               1442                 :                :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                               1443                 :                :                  errmsg("user \"%s\" cannot replicate into relation with row-level security enabled: \"%s\"",
                               1444                 :                :                         GetUserNameFromId(GetUserId(), true),
                               1445                 :                :                         RelationGetRelationName(rel))));
                               1446                 :                : 
                               1447                 :                :     /* Now do the initial data copy */
 1350 tomas.vondra@postgre     1448                 :CBC         197 :     PushActiveSnapshot(GetTransactionSnapshot());
                               1449                 :            197 :     copy_table(rel);
                               1450                 :            185 :     PopActiveSnapshot();
                               1451                 :                : 
 1680 alvherre@alvh.no-ip.     1452                 :            185 :     res = walrcv_exec(LogRepWorkerWalRcvConn, "COMMIT", 0, NULL);
 1889                          1453         [ -  + ]:            185 :     if (res->status != WALRCV_OK_COMMAND)
 1889 alvherre@alvh.no-ip.     1454         [ #  # ]:UBC           0 :         ereport(ERROR,
                               1455                 :                :                 (errcode(ERRCODE_CONNECTION_FAILURE),
                               1456                 :                :                  errmsg("table copy could not finish transaction on publisher: %s",
                               1457                 :                :                         res->err)));
 1889 alvherre@alvh.no-ip.     1458                 :CBC         185 :     walrcv_clear_result(res);
                               1459                 :                : 
  911 tgl@sss.pgh.pa.us        1460         [ +  + ]:            185 :     if (!run_as_owner)
  922 msawada@postgresql.o     1461                 :            184 :         RestoreUserContext(&ucxt);
                               1462                 :                : 
 1889 alvherre@alvh.no-ip.     1463                 :            185 :     table_close(rel, NoLock);
                               1464                 :                : 
                               1465                 :                :     /* Make the copy visible. */
                               1466                 :            185 :     CommandCounterIncrement();
                               1467                 :                : 
                               1468                 :                :     /*
                               1469                 :                :      * Update the persisted state to indicate the COPY phase is done; make it
                               1470                 :                :      * visible to others.
                               1471                 :                :      */
 1769 akapila@postgresql.o     1472                 :            185 :     UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
                               1473                 :            185 :                                MyLogicalRepWorker->relid,
                               1474                 :                :                                SUBREL_STATE_FINISHEDCOPY,
  138                          1475                 :            185 :                                MyLogicalRepWorker->relstate_lsn,
                               1476                 :                :                                false);
                               1477                 :                : 
 1769                          1478                 :            185 :     CommitTransactionCommand();
                               1479                 :                : 
                               1480                 :            185 : copy_table_done:
                               1481                 :                : 
                               1482         [ -  + ]:            185 :     elog(DEBUG1,
                               1483                 :                :          "LogicalRepSyncTableStart: '%s' origin_startpos lsn %X/%08X",
                               1484                 :                :          originname, LSN_FORMAT_ARGS(*origin_startpos));
                               1485                 :                : 
                               1486                 :                :     /*
                               1487                 :                :      * We are done with the initial data synchronization, update the state.
                               1488                 :                :      */
 1889 alvherre@alvh.no-ip.     1489         [ -  + ]:            185 :     SpinLockAcquire(&MyLogicalRepWorker->relmutex);
                               1490                 :            185 :     MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCWAIT;
                               1491                 :            185 :     MyLogicalRepWorker->relstate_lsn = *origin_startpos;
                               1492                 :            185 :     SpinLockRelease(&MyLogicalRepWorker->relmutex);
                               1493                 :                : 
                               1494                 :                :     /*
                               1495                 :                :      * Finally, wait until the leader apply worker tells us to catch up and
                               1496                 :                :      * then return to let LogicalRepApplyLoop do it.
                               1497                 :                :      */
                               1498                 :            185 :     wait_for_worker_state_change(SUBREL_STATE_CATCHUP);
 3191 peter_e@gmx.net          1499                 :            185 :     return slotname;
                               1500                 :                : }
                               1501                 :                : 
                               1502                 :                : /*
                               1503                 :                :  * Execute the initial sync with error handling. Disable the subscription,
                               1504                 :                :  * if it's required.
                               1505                 :                :  *
                               1506                 :                :  * Allocate the slot name in long-lived context on return. Note that we don't
                               1507                 :                :  * handle FATAL errors which are probably because of system resource error and
                               1508                 :                :  * are not repeatable.
                               1509                 :                :  */
                               1510                 :                : static void
  867 akapila@postgresql.o     1511                 :            199 : start_table_sync(XLogRecPtr *origin_startpos, char **slotname)
                               1512                 :                : {
                               1513                 :            199 :     char       *sync_slotname = NULL;
                               1514                 :                : 
                               1515         [ -  + ]:            199 :     Assert(am_tablesync_worker());
                               1516                 :                : 
                               1517         [ +  + ]:            199 :     PG_TRY();
                               1518                 :                :     {
                               1519                 :                :         /* Call initial sync. */
                               1520                 :            199 :         sync_slotname = LogicalRepSyncTableStart(origin_startpos);
                               1521                 :                :     }
                               1522                 :             12 :     PG_CATCH();
                               1523                 :                :     {
                               1524         [ +  + ]:             12 :         if (MySubscription->disableonerr)
                               1525                 :              1 :             DisableSubscriptionAndExit();
                               1526                 :                :         else
                               1527                 :                :         {
                               1528                 :                :             /*
                               1529                 :                :              * Report the worker failed during table synchronization. Abort
                               1530                 :                :              * the current transaction so that the stats message is sent in an
                               1531                 :                :              * idle state.
                               1532                 :                :              */
                               1533                 :             11 :             AbortOutOfAnyTransaction();
   40 akapila@postgresql.o     1534                 :GNC          11 :             pgstat_report_subscription_error(MySubscription->oid,
                               1535                 :                :                                              WORKERTYPE_TABLESYNC);
                               1536                 :                : 
  867 akapila@postgresql.o     1537                 :CBC          11 :             PG_RE_THROW();
                               1538                 :                :         }
                               1539                 :                :     }
                               1540         [ -  + ]:            185 :     PG_END_TRY();
                               1541                 :                : 
                               1542                 :                :     /* allocate slot name in long-lived context */
                               1543                 :            185 :     *slotname = MemoryContextStrdup(ApplyContext, sync_slotname);
                               1544                 :            185 :     pfree(sync_slotname);
                               1545                 :            185 : }
                               1546                 :                : 
                               1547                 :                : /*
                               1548                 :                :  * Runs the tablesync worker.
                               1549                 :                :  *
                               1550                 :                :  * It starts syncing tables. After a successful sync, sets streaming options
                               1551                 :                :  * and starts streaming to catchup with apply worker.
                               1552                 :                :  */
                               1553                 :                : static void
   14 nathan@postgresql.or     1554                 :GNC         199 : run_tablesync_worker(void)
                               1555                 :                : {
                               1556                 :                :     char        originname[NAMEDATALEN];
  867 akapila@postgresql.o     1557                 :CBC         199 :     XLogRecPtr  origin_startpos = InvalidXLogRecPtr;
                               1558                 :            199 :     char       *slotname = NULL;
                               1559                 :                :     WalRcvStreamOptions options;
                               1560                 :                : 
                               1561                 :            199 :     start_table_sync(&origin_startpos, &slotname);
                               1562                 :                : 
                               1563                 :            185 :     ReplicationOriginNameForLogicalRep(MySubscription->oid,
                               1564                 :            185 :                                        MyLogicalRepWorker->relid,
                               1565                 :                :                                        originname,
                               1566                 :                :                                        sizeof(originname));
                               1567                 :                : 
                               1568                 :            185 :     set_apply_error_context_origin(originname);
                               1569                 :                : 
                               1570                 :            185 :     set_stream_options(&options, slotname, &origin_startpos);
                               1571                 :                : 
                               1572                 :            185 :     walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
                               1573                 :                : 
                               1574                 :                :     /* Apply the changes till we catchup with the apply worker. */
                               1575                 :            185 :     start_apply(origin_startpos);
  867 akapila@postgresql.o     1576                 :UBC           0 : }
                               1577                 :                : 
                               1578                 :                : /* Logical Replication Tablesync worker entry point */
                               1579                 :                : void
   42 akapila@postgresql.o     1580                 :GNC         200 : TableSyncWorkerMain(Datum main_arg)
                               1581                 :                : {
  867 akapila@postgresql.o     1582                 :CBC         200 :     int         worker_slot = DatumGetInt32(main_arg);
                               1583                 :                : 
                               1584                 :            200 :     SetupApplyOrSyncWorker(worker_slot);
                               1585                 :                : 
                               1586                 :            199 :     run_tablesync_worker();
                               1587                 :                : 
   62 akapila@postgresql.o     1588                 :UNC           0 :     FinishSyncWorker();
                               1589                 :                : }
                               1590                 :                : 
                               1591                 :                : /*
                               1592                 :                :  * If the subscription has no tables then return false.
                               1593                 :                :  *
                               1594                 :                :  * Otherwise, are all tablesyncs READY?
                               1595                 :                :  *
                               1596                 :                :  * Note: This function is not suitable to be called from outside of apply or
                               1597                 :                :  * tablesync workers because MySubscription needs to be already initialized.
                               1598                 :                :  */
                               1599                 :                : bool
 1617 akapila@postgresql.o     1600                 :CBC         160 : AllTablesyncsReady(void)
                               1601                 :                : {
                               1602                 :                :     bool        started_tx;
                               1603                 :                :     bool        has_tables;
                               1604                 :                : 
                               1605                 :                :     /* We need up-to-date sync state info for subscription tables here. */
   42 akapila@postgresql.o     1606                 :GNC         160 :     FetchRelationStates(&has_tables, NULL, &started_tx);
                               1607                 :                : 
 1617 akapila@postgresql.o     1608         [ +  + ]:CBC         160 :     if (started_tx)
                               1609                 :                :     {
                               1610                 :             16 :         CommitTransactionCommand();
 1351 andres@anarazel.de       1611                 :             16 :         pgstat_report_stat(true);
                               1612                 :                :     }
                               1613                 :                : 
                               1614                 :                :     /*
                               1615                 :                :      * Return false when there are no tables in subscription or not all tables
                               1616                 :                :      * are in ready state; true otherwise.
                               1617                 :                :      */
   42 akapila@postgresql.o     1618   [ +  -  +  + ]:GNC         160 :     return has_tables && (table_states_not_ready == NIL);
                               1619                 :                : }
                               1620                 :                : 
                               1621                 :                : /*
                               1622                 :                :  * Return whether the subscription currently has any tables.
                               1623                 :                :  *
                               1624                 :                :  * Note: Unlike HasSubscriptionTables(), this function relies on cached
                               1625                 :                :  * information for subscription tables. Additionally, it should not be
                               1626                 :                :  * invoked outside of apply or tablesync workers, as MySubscription must be
                               1627                 :                :  * initialized first.
                               1628                 :                :  */
                               1629                 :                : bool
   62                          1630                 :             79 : HasSubscriptionTablesCached(void)
                               1631                 :                : {
                               1632                 :                :     bool        started_tx;
                               1633                 :                :     bool        has_tables;
                               1634                 :                : 
                               1635                 :                :     /* We need up-to-date subscription tables info here */
   42                          1636                 :             79 :     FetchRelationStates(&has_tables, NULL, &started_tx);
                               1637                 :                : 
  100                          1638         [ -  + ]:             79 :     if (started_tx)
                               1639                 :                :     {
  100 akapila@postgresql.o     1640                 :UNC           0 :         CommitTransactionCommand();
                               1641                 :              0 :         pgstat_report_stat(true);
                               1642                 :                :     }
                               1643                 :                : 
   42 akapila@postgresql.o     1644                 :GNC          79 :     return has_tables;
                               1645                 :                : }
                               1646                 :                : 
                               1647                 :                : /*
                               1648                 :                :  * Update the two_phase state of the specified subscription in pg_subscription.
                               1649                 :                :  */
                               1650                 :                : void
 1617 akapila@postgresql.o     1651                 :CBC          10 : UpdateTwoPhaseState(Oid suboid, char new_state)
                               1652                 :                : {
                               1653                 :                :     Relation    rel;
                               1654                 :                :     HeapTuple   tup;
                               1655                 :                :     bool        nulls[Natts_pg_subscription];
                               1656                 :                :     bool        replaces[Natts_pg_subscription];
                               1657                 :                :     Datum       values[Natts_pg_subscription];
                               1658                 :                : 
                               1659   [ +  -  +  -  :             10 :     Assert(new_state == LOGICALREP_TWOPHASE_STATE_DISABLED ||
                                              -  + ]
                               1660                 :                :            new_state == LOGICALREP_TWOPHASE_STATE_PENDING ||
                               1661                 :                :            new_state == LOGICALREP_TWOPHASE_STATE_ENABLED);
                               1662                 :                : 
                               1663                 :             10 :     rel = table_open(SubscriptionRelationId, RowExclusiveLock);
                               1664                 :             10 :     tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(suboid));
                               1665         [ -  + ]:             10 :     if (!HeapTupleIsValid(tup))
 1617 akapila@postgresql.o     1666         [ #  # ]:UBC           0 :         elog(ERROR,
                               1667                 :                :              "cache lookup failed for subscription oid %u",
                               1668                 :                :              suboid);
                               1669                 :                : 
                               1670                 :                :     /* Form a new tuple. */
 1617 akapila@postgresql.o     1671                 :CBC          10 :     memset(values, 0, sizeof(values));
                               1672                 :             10 :     memset(nulls, false, sizeof(nulls));
                               1673                 :             10 :     memset(replaces, false, sizeof(replaces));
                               1674                 :                : 
                               1675                 :                :     /* And update/set two_phase state */
                               1676                 :             10 :     values[Anum_pg_subscription_subtwophasestate - 1] = CharGetDatum(new_state);
                               1677                 :             10 :     replaces[Anum_pg_subscription_subtwophasestate - 1] = true;
                               1678                 :                : 
                               1679                 :             10 :     tup = heap_modify_tuple(tup, RelationGetDescr(rel),
                               1680                 :                :                             values, nulls, replaces);
                               1681                 :             10 :     CatalogTupleUpdate(rel, &tup->t_self, tup);
                               1682                 :                : 
                               1683                 :             10 :     heap_freetuple(tup);
                               1684                 :             10 :     table_close(rel, RowExclusiveLock);
                               1685                 :             10 : }
        

Generated by: LCOV version 2.4-beta