LCOV - differential code coverage report
Current view: top level - src/backend/replication/logical - tablesync.c (source / functions) Coverage Total Hit LBC UBC CBC
Current: c70b6db34ffeab48beef1fb4ce61bcad3772b8dd vs 06473f5a344df8c9594ead90a609b86f6724cff8 Lines: 91.6 % 548 502 46 502
Current Date: 2025-09-06 07:49:51 +0900 Functions: 100.0 % 19 19 19
Baseline: lcov-20250906-005545-baseline Branches: 64.9 % 325 211 3 111 211
Baseline Date: 2025-09-05 08:21:35 +0100 Line coverage date bins:
Legend: Lines:     hit not hit
Branches: + taken - not taken # not executed
(30,360] days: 100.0 % 39 39 39
(360..) days: 91.0 % 509 463 46 463
Function coverage date bins:
(30,360] days: 100.0 % 1 1 1
(360..) days: 100.0 % 18 18 18
Branch coverage date bins:
(30,360] days: 66.7 % 30 20 1 9 20
(360..) days: 64.7 % 295 191 2 102 191

 Age         Owner                    Branch data    TLA  Line data    Source code
                                  1                 :                : /*-------------------------------------------------------------------------
                                  2                 :                :  * tablesync.c
                                  3                 :                :  *    PostgreSQL logical replication: initial table data synchronization
                                  4                 :                :  *
                                  5                 :                :  * Copyright (c) 2012-2025, PostgreSQL Global Development Group
                                  6                 :                :  *
                                  7                 :                :  * IDENTIFICATION
                                  8                 :                :  *    src/backend/replication/logical/tablesync.c
                                  9                 :                :  *
                                 10                 :                :  * NOTES
                                 11                 :                :  *    This file contains code for initial table data synchronization for
                                 12                 :                :  *    logical replication.
                                 13                 :                :  *
                                 14                 :                :  *    The initial data synchronization is done separately for each table,
                                 15                 :                :  *    in a separate apply worker that only fetches the initial snapshot data
                                 16                 :                :  *    from the publisher and then synchronizes the position in the stream with
                                 17                 :                :  *    the leader apply worker.
                                 18                 :                :  *
                                 19                 :                :  *    There are several reasons for doing the synchronization this way:
                                 20                 :                :  *     - It allows us to parallelize the initial data synchronization
                                 21                 :                :  *       which lowers the time needed for it to happen.
                                 22                 :                :  *     - The initial synchronization does not have to hold the xid and LSN
                                 23                 :                :  *       for the time it takes to copy data of all tables, causing less
                                 24                 :                :  *       bloat and lower disk consumption compared to doing the
                                 25                 :                :  *       synchronization in a single process for the whole database.
                                 26                 :                :  *     - It allows us to synchronize any tables added after the initial
                                 27                 :                :  *       synchronization has finished.
                                 28                 :                :  *
                                 29                 :                :  *    The stream position synchronization works in multiple steps:
                                 30                 :                :  *     - Apply worker requests a tablesync worker to start, setting the new
                                 31                 :                :  *       table state to INIT.
                                 32                 :                :  *     - Tablesync worker starts; changes table state from INIT to DATASYNC while
                                 33                 :                :  *       copying.
                                 34                 :                :  *     - Tablesync worker does initial table copy; there is a FINISHEDCOPY (sync
                                 35                 :                :  *       worker specific) state to indicate when the copy phase has completed, so
                                 36                 :                :  *       if the worker crashes with this (non-memory) state then the copy will not
                                 37                 :                :  *       be re-attempted.
                                 38                 :                :  *     - Tablesync worker then sets table state to SYNCWAIT; waits for state change.
                                 39                 :                :  *     - Apply worker periodically checks for tables in SYNCWAIT state.  When
                                 40                 :                :  *       any appear, it sets the table state to CATCHUP and starts loop-waiting
                                 41                 :                :  *       until either the table state is set to SYNCDONE or the sync worker
                                 42                 :                :  *       exits.
                                 43                 :                :  *     - After the sync worker has seen the state change to CATCHUP, it will
                                 44                 :                :  *       read the stream and apply changes (acting like an apply worker) until
                                 45                 :                :  *       it catches up to the specified stream position.  Then it sets the
                                 46                 :                :  *       state to SYNCDONE.  There might be zero changes applied between
                                 47                 :                :  *       CATCHUP and SYNCDONE, because the sync worker might be ahead of the
                                 48                 :                :  *       apply worker.
                                 49                 :                :  *     - Once the state is set to SYNCDONE, the apply will continue tracking
                                 50                 :                :  *       the table until it reaches the SYNCDONE stream position, at which
                                 51                 :                :  *       point it sets state to READY and stops tracking.  Again, there might
                                 52                 :                :  *       be zero changes in between.
                                 53                 :                :  *
                                 54                 :                :  *    So the state progression is always: INIT -> DATASYNC -> FINISHEDCOPY
                                 55                 :                :  *    -> SYNCWAIT -> CATCHUP -> SYNCDONE -> READY.
                                 56                 :                :  *
                                 57                 :                :  *    The catalog pg_subscription_rel is used to keep information about
                                 58                 :                :  *    subscribed tables and their state.  The catalog holds all states
                                 59                 :                :  *    except SYNCWAIT and CATCHUP which are only in shared memory.
                                 60                 :                :  *
                                 61                 :                :  *    Example flows look like this:
                                 62                 :                :  *     - Apply is in front:
                                 63                 :                :  *        sync:8
                                 64                 :                :  *          -> set in catalog FINISHEDCOPY
                                 65                 :                :  *          -> set in memory SYNCWAIT
                                 66                 :                :  *        apply:10
                                 67                 :                :  *          -> set in memory CATCHUP
                                 68                 :                :  *          -> enter wait-loop
                                 69                 :                :  *        sync:10
                                 70                 :                :  *          -> set in catalog SYNCDONE
                                 71                 :                :  *          -> exit
                                 72                 :                :  *        apply:10
                                 73                 :                :  *          -> exit wait-loop
                                 74                 :                :  *          -> continue rep
                                 75                 :                :  *        apply:11
                                 76                 :                :  *          -> set in catalog READY
                                 77                 :                :  *
                                 78                 :                :  *     - Sync is in front:
                                 79                 :                :  *        sync:10
                                 80                 :                :  *          -> set in catalog FINISHEDCOPY
                                 81                 :                :  *          -> set in memory SYNCWAIT
                                 82                 :                :  *        apply:8
                                 83                 :                :  *          -> set in memory CATCHUP
                                 84                 :                :  *          -> continue per-table filtering
                                 85                 :                :  *        sync:10
                                 86                 :                :  *          -> set in catalog SYNCDONE
                                 87                 :                :  *          -> exit
                                 88                 :                :  *        apply:10
                                 89                 :                :  *          -> set in catalog READY
                                 90                 :                :  *          -> stop per-table filtering
                                 91                 :                :  *          -> continue rep
                                 92                 :                :  *-------------------------------------------------------------------------
                                 93                 :                :  */
                                 94                 :                : 
                                 95                 :                : #include "postgres.h"
                                 96                 :                : 
                                 97                 :                : #include "access/table.h"
                                 98                 :                : #include "access/xact.h"
                                 99                 :                : #include "catalog/indexing.h"
                                100                 :                : #include "catalog/pg_subscription_rel.h"
                                101                 :                : #include "catalog/pg_type.h"
                                102                 :                : #include "commands/copy.h"
                                103                 :                : #include "miscadmin.h"
                                104                 :                : #include "nodes/makefuncs.h"
                                105                 :                : #include "parser/parse_relation.h"
                                106                 :                : #include "pgstat.h"
                                107                 :                : #include "replication/logicallauncher.h"
                                108                 :                : #include "replication/logicalrelation.h"
                                109                 :                : #include "replication/logicalworker.h"
                                110                 :                : #include "replication/origin.h"
                                111                 :                : #include "replication/slot.h"
                                112                 :                : #include "replication/walreceiver.h"
                                113                 :                : #include "replication/worker_internal.h"
                                114                 :                : #include "storage/ipc.h"
                                115                 :                : #include "storage/lmgr.h"
                                116                 :                : #include "utils/acl.h"
                                117                 :                : #include "utils/array.h"
                                118                 :                : #include "utils/builtins.h"
                                119                 :                : #include "utils/lsyscache.h"
                                120                 :                : #include "utils/memutils.h"
                                121                 :                : #include "utils/rls.h"
                                122                 :                : #include "utils/snapmgr.h"
                                123                 :                : #include "utils/syscache.h"
                                124                 :                : #include "utils/usercontext.h"
                                125                 :                : 
                                126                 :                : typedef enum
                                127                 :                : {
                                128                 :                :     SYNC_TABLE_STATE_NEEDS_REBUILD,
                                129                 :                :     SYNC_TABLE_STATE_REBUILD_STARTED,
                                130                 :                :     SYNC_TABLE_STATE_VALID,
                                131                 :                : } SyncingTablesState;
                                132                 :                : 
                                133                 :                : static SyncingTablesState table_states_validity = SYNC_TABLE_STATE_NEEDS_REBUILD;
                                134                 :                : static List *table_states_not_ready = NIL;
                                135                 :                : static bool FetchTableStates(bool *started_tx);
                                136                 :                : 
                                137                 :                : static StringInfo copybuf = NULL;
                                138                 :                : 
                                139                 :                : /*
                                140                 :                :  * Exit routine for synchronization worker.
                                141                 :                :  */
                                142                 :                : pg_noreturn static void
 3089 peter_e@gmx.net           143                 :CBC         183 : finish_sync_worker(void)
                                144                 :                : {
                                145                 :                :     /*
                                146                 :                :      * Commit any outstanding transaction. This is the usual case, unless
                                147                 :                :      * there was nothing to do for the table.
                                148                 :                :      */
                                149         [ +  - ]:            183 :     if (IsTransactionState())
                                150                 :                :     {
                                151                 :            183 :         CommitTransactionCommand();
 1249 andres@anarazel.de        152                 :            183 :         pgstat_report_stat(true);
                                153                 :                :     }
                                154                 :                : 
                                155                 :                :     /* And flush all writes. */
 3089 peter_e@gmx.net           156                 :            183 :     XLogFlush(GetXLogWriteRecPtr());
                                157                 :                : 
 3027                           158                 :            183 :     StartTransactionCommand();
 3089                           159         [ +  - ]:            183 :     ereport(LOG,
                                160                 :                :             (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished",
                                161                 :                :                     MySubscription->name,
                                162                 :                :                     get_rel_name(MyLogicalRepWorker->relid))));
 3027                           163                 :            183 :     CommitTransactionCommand();
                                164                 :                : 
                                165                 :                :     /* Find the leader apply worker and signal it. */
 3014                           166                 :            183 :     logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid);
                                167                 :                : 
                                168                 :                :     /* Stop gracefully */
 3089                           169                 :            183 :     proc_exit(0);
                                170                 :                : }
                                171                 :                : 
                                172                 :                : /*
                                173                 :                :  * Wait until the relation sync state is set in the catalog to the expected
                                174                 :                :  * one; return true when it happens.
                                175                 :                :  *
                                176                 :                :  * Returns false if the table sync worker or the table itself have
                                177                 :                :  * disappeared, or the table state has been reset.
                                178                 :                :  *
                                179                 :                :  * Currently, this is used in the apply worker when transitioning from
                                180                 :                :  * CATCHUP state to SYNCDONE.
                                181                 :                :  */
                                182                 :                : static bool
 3014                           183                 :            181 : wait_for_relation_state_change(Oid relid, char expected_state)
                                184                 :                : {
                                185                 :                :     char        state;
                                186                 :                : 
                                187                 :                :     for (;;)
 3089                           188                 :            203 :     {
                                189                 :                :         LogicalRepWorker *worker;
                                190                 :                :         XLogRecPtr  statelsn;
                                191                 :                : 
 3018                           192         [ -  + ]:            384 :         CHECK_FOR_INTERRUPTS();
                                193                 :                : 
 1787 alvherre@alvh.no-ip.      194                 :            384 :         InvalidateCatalogSnapshot();
 3014 peter_e@gmx.net           195                 :            384 :         state = GetSubscriptionRelState(MyLogicalRepWorker->subid,
                                196                 :                :                                         relid, &statelsn);
                                197                 :                : 
                                198         [ -  + ]:            384 :         if (state == SUBREL_STATE_UNKNOWN)
 1787 alvherre@alvh.no-ip.      199                 :UBC           0 :             break;
                                200                 :                : 
 3014 peter_e@gmx.net           201         [ +  + ]:CBC         384 :         if (state == expected_state)
                                202                 :            181 :             return true;
                                203                 :                : 
                                204                 :                :         /* Check if the sync worker is still running and bail if not. */
 3089                           205                 :            203 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
 1787 alvherre@alvh.no-ip.      206                 :            203 :         worker = logicalrep_worker_find(MyLogicalRepWorker->subid, relid,
                                207                 :                :                                         false);
 3014 peter_e@gmx.net           208                 :            203 :         LWLockRelease(LogicalRepWorkerLock);
 3089                           209         [ -  + ]:            203 :         if (!worker)
 1787 alvherre@alvh.no-ip.      210                 :UBC           0 :             break;
                                211                 :                : 
 2479 tmunro@postgresql.or      212                 :CBC         203 :         (void) WaitLatch(MyLatch,
                                213                 :                :                          WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
                                214                 :                :                          1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
                                215                 :                : 
 3014 andres@anarazel.de        216                 :            203 :         ResetLatch(MyLatch);
                                217                 :                :     }
                                218                 :                : 
 3014 peter_e@gmx.net           219                 :UBC           0 :     return false;
                                220                 :                : }
                                221                 :                : 
                                222                 :                : /*
                                223                 :                :  * Wait until the apply worker changes the state of our synchronization
                                224                 :                :  * worker to the expected one.
                                225                 :                :  *
                                226                 :                :  * Used when transitioning from SYNCWAIT state to CATCHUP.
                                227                 :                :  *
                                228                 :                :  * Returns false if the apply worker has disappeared.
                                229                 :                :  */
                                230                 :                : static bool
 3014 peter_e@gmx.net           231                 :CBC         183 : wait_for_worker_state_change(char expected_state)
                                232                 :                : {
                                233                 :                :     int         rc;
                                234                 :                : 
                                235                 :                :     for (;;)
                                236                 :            184 :     {
                                237                 :                :         LogicalRepWorker *worker;
                                238                 :                : 
                                239         [ -  + ]:            367 :         CHECK_FOR_INTERRUPTS();
                                240                 :                : 
                                241                 :                :         /*
                                242                 :                :          * Done if already in correct state.  (We assume this fetch is atomic
                                243                 :                :          * enough to not give a misleading answer if we do it with no lock.)
                                244                 :                :          */
 2990 tgl@sss.pgh.pa.us         245         [ +  + ]:            367 :         if (MyLogicalRepWorker->relstate == expected_state)
                                246                 :            183 :             return true;
                                247                 :                : 
                                248                 :                :         /*
                                249                 :                :          * Bail out if the apply worker has died, else signal it we're
                                250                 :                :          * waiting.
                                251                 :                :          */
 3014 peter_e@gmx.net           252                 :            184 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
                                253                 :            184 :         worker = logicalrep_worker_find(MyLogicalRepWorker->subid,
                                254                 :                :                                         InvalidOid, false);
 2990 tgl@sss.pgh.pa.us         255   [ +  -  +  - ]:            184 :         if (worker && worker->proc)
                                256                 :            184 :             logicalrep_worker_wakeup_ptr(worker);
 3014 peter_e@gmx.net           257                 :            184 :         LWLockRelease(LogicalRepWorkerLock);
                                258         [ -  + ]:            184 :         if (!worker)
 2990 tgl@sss.pgh.pa.us         259                 :UBC           0 :             break;
                                260                 :                : 
                                261                 :                :         /*
                                262                 :                :          * Wait.  We expect to get a latch signal back from the apply worker,
                                263                 :                :          * but use a timeout in case it dies without sending one.
                                264                 :                :          */
 3014 andres@anarazel.de        265                 :CBC         184 :         rc = WaitLatch(MyLatch,
                                266                 :                :                        WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
                                267                 :                :                        1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
                                268                 :                : 
 2990 tgl@sss.pgh.pa.us         269         [ +  - ]:            184 :         if (rc & WL_LATCH_SET)
                                270                 :            184 :             ResetLatch(MyLatch);
                                271                 :                :     }
                                272                 :                : 
 3089 peter_e@gmx.net           273                 :UBC           0 :     return false;
                                274                 :                : }
                                275                 :                : 
                                276                 :                : /*
                                277                 :                :  * Callback from syscache invalidation.
                                278                 :                :  */
                                279                 :                : void
 3089 peter_e@gmx.net           280                 :CBC        1715 : invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
                                281                 :                : {
  499 akapila@postgresql.o      282                 :           1715 :     table_states_validity = SYNC_TABLE_STATE_NEEDS_REBUILD;
 3089 peter_e@gmx.net           283                 :           1715 : }
                                284                 :                : 
                                285                 :                : /*
                                286                 :                :  * Handle table synchronization cooperation from the synchronization
                                287                 :                :  * worker.
                                288                 :                :  *
                                289                 :                :  * If the sync worker is in CATCHUP state and reached (or passed) the
                                290                 :                :  * predetermined synchronization point in the WAL stream, mark the table as
                                291                 :                :  * SYNCDONE and finish.
                                292                 :                :  */
                                293                 :                : static void
                                294                 :            200 : process_syncing_tables_for_sync(XLogRecPtr current_lsn)
                                295                 :                : {
                                296         [ -  + ]:            200 :     SpinLockAcquire(&MyLogicalRepWorker->relmutex);
                                297                 :                : 
                                298         [ +  - ]:            200 :     if (MyLogicalRepWorker->relstate == SUBREL_STATE_CATCHUP &&
                                299         [ +  + ]:            200 :         current_lsn >= MyLogicalRepWorker->relstate_lsn)
                                300                 :                :     {
                                301                 :                :         TimeLineID  tli;
 1667 akapila@postgresql.o      302                 :            183 :         char        syncslotname[NAMEDATALEN] = {0};
 1103                           303                 :            183 :         char        originname[NAMEDATALEN] = {0};
                                304                 :                : 
 3014 peter_e@gmx.net           305                 :            183 :         MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCDONE;
 3089                           306                 :            183 :         MyLogicalRepWorker->relstate_lsn = current_lsn;
                                307                 :                : 
                                308                 :            183 :         SpinLockRelease(&MyLogicalRepWorker->relmutex);
                                309                 :                : 
                                310                 :                :         /*
                                311                 :                :          * UpdateSubscriptionRelState must be called within a transaction.
                                312                 :                :          */
 1667 akapila@postgresql.o      313         [ +  - ]:            183 :         if (!IsTransactionState())
                                314                 :            183 :             StartTransactionCommand();
                                315                 :                : 
 2710 peter_e@gmx.net           316                 :            183 :         UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
                                317                 :            183 :                                    MyLogicalRepWorker->relid,
                                318                 :            183 :                                    MyLogicalRepWorker->relstate,
   36 akapila@postgresql.o      319                 :            183 :                                    MyLogicalRepWorker->relstate_lsn,
                                320                 :                :                                    false);
                                321                 :                : 
                                322                 :                :         /*
                                323                 :                :          * End streaming so that LogRepWorkerWalRcvConn can be used to drop
                                324                 :                :          * the slot.
                                325                 :                :          */
 1578 alvherre@alvh.no-ip.      326                 :            183 :         walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli);
                                327                 :                : 
                                328                 :                :         /*
                                329                 :                :          * Cleanup the tablesync slot.
                                330                 :                :          *
                                331                 :                :          * This has to be done after updating the state because otherwise if
                                332                 :                :          * there is an error while doing the database operations we won't be
                                333                 :                :          * able to rollback dropped slot.
                                334                 :                :          */
 1667 akapila@postgresql.o      335                 :            183 :         ReplicationSlotNameForTablesync(MyLogicalRepWorker->subid,
                                336                 :            183 :                                         MyLogicalRepWorker->relid,
                                337                 :                :                                         syncslotname,
                                338                 :                :                                         sizeof(syncslotname));
                                339                 :                : 
                                340                 :                :         /*
                                341                 :                :          * It is important to give an error if we are unable to drop the slot,
                                342                 :                :          * otherwise, it won't be dropped till the corresponding subscription
                                343                 :                :          * is dropped. So passing missing_ok = false.
                                344                 :                :          */
 1578 alvherre@alvh.no-ip.      345                 :            183 :         ReplicationSlotDropAtPubNode(LogRepWorkerWalRcvConn, syncslotname, false);
                                346                 :                : 
 1090 akapila@postgresql.o      347                 :            183 :         CommitTransactionCommand();
                                348                 :            183 :         pgstat_report_stat(false);
                                349                 :                : 
                                350                 :                :         /*
                                351                 :                :          * Start a new transaction to clean up the tablesync origin tracking.
                                352                 :                :          * This transaction will be ended within the finish_sync_worker().
                                353                 :                :          * Now, even, if we fail to remove this here, the apply worker will
                                354                 :                :          * ensure to clean it up afterward.
                                355                 :                :          *
                                356                 :                :          * We need to do this after the table state is set to SYNCDONE.
                                357                 :                :          * Otherwise, if an error occurs while performing the database
                                358                 :                :          * operation, the worker will be restarted and the in-memory state of
                                359                 :                :          * replication progress (remote_lsn) won't be rolled-back which would
                                360                 :                :          * have been cleared before restart. So, the restarted worker will use
                                361                 :                :          * invalid replication progress state resulting in replay of
                                362                 :                :          * transactions that have already been applied.
                                363                 :                :          */
                                364                 :            183 :         StartTransactionCommand();
                                365                 :                : 
 1061                           366                 :            183 :         ReplicationOriginNameForLogicalRep(MyLogicalRepWorker->subid,
                                367                 :            183 :                                            MyLogicalRepWorker->relid,
                                368                 :                :                                            originname,
                                369                 :                :                                            sizeof(originname));
                                370                 :                : 
                                371                 :                :         /*
                                372                 :                :          * Resetting the origin session removes the ownership of the slot.
                                373                 :                :          * This is needed to allow the origin to be dropped.
                                374                 :                :          */
 1090                           375                 :            183 :         replorigin_session_reset();
                                376                 :            183 :         replorigin_session_origin = InvalidRepOriginId;
                                377                 :            183 :         replorigin_session_origin_lsn = InvalidXLogRecPtr;
                                378                 :            183 :         replorigin_session_origin_timestamp = 0;
                                379                 :                : 
                                380                 :                :         /*
                                381                 :                :          * Drop the tablesync's origin tracking if exists.
                                382                 :                :          *
                                383                 :                :          * There is a chance that the user is concurrently performing refresh
                                384                 :                :          * for the subscription where we remove the table state and its origin
                                385                 :                :          * or the apply worker would have removed this origin. So passing
                                386                 :                :          * missing_ok = true.
                                387                 :                :          */
                                388                 :            183 :         replorigin_drop_by_name(originname, true, false);
                                389                 :                : 
 3089 peter_e@gmx.net           390                 :            183 :         finish_sync_worker();
                                391                 :                :     }
                                392                 :                :     else
                                393                 :             17 :         SpinLockRelease(&MyLogicalRepWorker->relmutex);
                                394                 :             17 : }
                                395                 :                : 
                                396                 :                : /*
                                397                 :                :  * Handle table synchronization cooperation from the apply worker.
                                398                 :                :  *
                                399                 :                :  * Walk over all subscription tables that are individually tracked by the
                                400                 :                :  * apply process (currently, all that have state other than
                                401                 :                :  * SUBREL_STATE_READY) and manage synchronization for them.
                                402                 :                :  *
                                403                 :                :  * If there are tables that need synchronizing and are not being synchronized
                                404                 :                :  * yet, start sync workers for them (if there are free slots for sync
                                405                 :                :  * workers).  To prevent starting the sync worker for the same relation at a
                                406                 :                :  * high frequency after a failure, we store its last start time with each sync
                                407                 :                :  * state info.  We start the sync worker for the same relation after waiting
                                408                 :                :  * at least wal_retrieve_retry_interval.
                                409                 :                :  *
                                410                 :                :  * For tables that are being synchronized already, check if sync workers
                                411                 :                :  * either need action from the apply worker or have finished.  This is the
                                412                 :                :  * SYNCWAIT to CATCHUP transition.
                                413                 :                :  *
                                414                 :                :  * If the synchronization position is reached (SYNCDONE), then the table can
                                415                 :                :  * be marked as READY and is no longer tracked.
                                416                 :                :  */
                                417                 :                : static void
                                418                 :           3559 : process_syncing_tables_for_apply(XLogRecPtr current_lsn)
                                419                 :                : {
                                420                 :                :     struct tablesync_start_time_mapping
                                421                 :                :     {
                                422                 :                :         Oid         relid;
                                423                 :                :         TimestampTz last_start_time;
                                424                 :                :     };
                                425                 :                :     static HTAB *last_start_times = NULL;
                                426                 :                :     ListCell   *lc;
 3043                           427                 :           3559 :     bool        started_tx = false;
  974 tgl@sss.pgh.pa.us         428                 :           3559 :     bool        should_exit = false;
   36 akapila@postgresql.o      429                 :           3559 :     Relation    rel = NULL;
                                430                 :                : 
 3089 peter_e@gmx.net           431         [ -  + ]:           3559 :     Assert(!IsTransactionState());
                                432                 :                : 
                                433                 :                :     /* We need up-to-date sync state info for subscription tables here. */
 1515 akapila@postgresql.o      434                 :           3559 :     FetchTableStates(&started_tx);
                                435                 :                : 
                                436                 :                :     /*
                                437                 :                :      * Prepare a hash table for tracking last start times of workers, to avoid
                                438                 :                :      * immediate restarts.  We don't need it if there are no tables that need
                                439                 :                :      * syncing.
                                440                 :                :      */
 1116 tgl@sss.pgh.pa.us         441   [ +  +  +  + ]:           3559 :     if (table_states_not_ready != NIL && !last_start_times)
 3054 peter_e@gmx.net           442                 :            122 :     {
                                443                 :                :         HASHCTL     ctl;
                                444                 :                : 
                                445                 :            122 :         ctl.keysize = sizeof(Oid);
                                446                 :            122 :         ctl.entrysize = sizeof(struct tablesync_start_time_mapping);
                                447                 :            122 :         last_start_times = hash_create("Logical replication table sync worker start times",
                                448                 :                :                                        256, &ctl, HASH_ELEM | HASH_BLOBS);
                                449                 :                :     }
                                450                 :                : 
                                451                 :                :     /*
                                452                 :                :      * Clean up the hash table when we're done with all tables (just to
                                453                 :                :      * release the bit of memory).
                                454                 :                :      */
 1116 tgl@sss.pgh.pa.us         455   [ +  +  +  + ]:           3437 :     else if (table_states_not_ready == NIL && last_start_times)
                                456                 :                :     {
 3054 peter_e@gmx.net           457                 :             91 :         hash_destroy(last_start_times);
                                458                 :             91 :         last_start_times = NULL;
                                459                 :                :     }
                                460                 :                : 
                                461                 :                :     /*
                                462                 :                :      * Process all tables that are being synchronized.
                                463                 :                :      */
 1515 akapila@postgresql.o      464   [ +  +  +  +  :           5381 :     foreach(lc, table_states_not_ready)
                                              +  + ]
                                465                 :                :     {
 3034 bruce@momjian.us          466                 :           1825 :         SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
                                467                 :                : 
 3089 peter_e@gmx.net           468         [ +  + ]:           1825 :         if (rstate->state == SUBREL_STATE_SYNCDONE)
                                469                 :                :         {
                                470                 :                :             /*
                                471                 :                :              * Apply has caught up to the position where the table sync has
                                472                 :                :              * finished.  Mark the table as ready so that the apply will just
                                473                 :                :              * continue to replicate it normally.
                                474                 :                :              */
                                475         [ +  + ]:            182 :             if (current_lsn >= rstate->lsn)
                                476                 :                :             {
                                477                 :                :                 char        originname[NAMEDATALEN];
                                478                 :                : 
                                479                 :            181 :                 rstate->state = SUBREL_STATE_READY;
                                480                 :            181 :                 rstate->lsn = current_lsn;
 3043                           481         [ +  + ]:            181 :                 if (!started_tx)
                                482                 :                :                 {
                                483                 :              9 :                     StartTransactionCommand();
                                484                 :              9 :                     started_tx = true;
                                485                 :                :                 }
                                486                 :                : 
                                487                 :                :                 /*
                                488                 :                :                  * Remove the tablesync origin tracking if exists.
                                489                 :                :                  *
                                490                 :                :                  * There is a chance that the user is concurrently performing
                                491                 :                :                  * refresh for the subscription where we remove the table
                                492                 :                :                  * state and its origin or the tablesync worker would have
                                493                 :                :                  * already removed this origin. We can't rely on tablesync
                                494                 :                :                  * worker to remove the origin tracking as if there is any
                                495                 :                :                  * error while dropping we won't restart it to drop the
                                496                 :                :                  * origin. So passing missing_ok = true.
                                497                 :                :                  *
                                498                 :                :                  * Lock the subscription and origin in the same order as we
                                499                 :                :                  * are doing during DDL commands to avoid deadlocks. See
                                500                 :                :                  * AlterSubscription_refresh.
                                501                 :                :                  */
   36 akapila@postgresql.o      502                 :            181 :                 LockSharedObject(SubscriptionRelationId, MyLogicalRepWorker->subid,
                                503                 :                :                                  0, AccessShareLock);
                                504                 :                : 
                                505         [ +  - ]:            181 :                 if (!rel)
                                506                 :            181 :                     rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
                                507                 :                : 
 1061                           508                 :            181 :                 ReplicationOriginNameForLogicalRep(MyLogicalRepWorker->subid,
                                509                 :                :                                                    rstate->relid,
                                510                 :                :                                                    originname,
                                511                 :                :                                                    sizeof(originname));
 1090                           512                 :            181 :                 replorigin_drop_by_name(originname, true, false);
                                513                 :                : 
                                514                 :                :                 /*
                                515                 :                :                  * Update the state to READY only after the origin cleanup.
                                516                 :                :                  */
 2710 peter_e@gmx.net           517                 :            181 :                 UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
                                518                 :            181 :                                            rstate->relid, rstate->state,
                                519                 :                :                                            rstate->lsn, true);
                                520                 :                :             }
                                521                 :                :         }
                                522                 :                :         else
                                523                 :                :         {
                                524                 :                :             LogicalRepWorker *syncworker;
                                525                 :                : 
                                526                 :                :             /*
                                527                 :                :              * Look for a sync worker for this relation.
                                528                 :                :              */
 3089                           529                 :           1643 :             LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
                                530                 :                : 
                                531                 :           1643 :             syncworker = logicalrep_worker_find(MyLogicalRepWorker->subid,
                                532                 :                :                                                 rstate->relid, false);
                                533                 :                : 
                                534         [ +  + ]:           1643 :             if (syncworker)
                                535                 :                :             {
                                536                 :                :                 /* Found one, update our copy of its state */
                                537         [ -  + ]:            740 :                 SpinLockAcquire(&syncworker->relmutex);
                                538                 :            740 :                 rstate->state = syncworker->relstate;
                                539                 :            740 :                 rstate->lsn = syncworker->relstate_lsn;
 2990 tgl@sss.pgh.pa.us         540         [ +  + ]:            740 :                 if (rstate->state == SUBREL_STATE_SYNCWAIT)
                                541                 :                :                 {
                                542                 :                :                     /*
                                543                 :                :                      * Sync worker is waiting for apply.  Tell sync worker it
                                544                 :                :                      * can catchup now.
                                545                 :                :                      */
                                546                 :            181 :                     syncworker->relstate = SUBREL_STATE_CATCHUP;
                                547                 :            181 :                     syncworker->relstate_lsn =
                                548                 :            181 :                         Max(syncworker->relstate_lsn, current_lsn);
                                549                 :                :                 }
 3089 peter_e@gmx.net           550                 :            740 :                 SpinLockRelease(&syncworker->relmutex);
                                551                 :                : 
                                552                 :                :                 /* If we told worker to catch up, wait for it. */
 2990 tgl@sss.pgh.pa.us         553         [ +  + ]:            740 :                 if (rstate->state == SUBREL_STATE_SYNCWAIT)
                                554                 :                :                 {
                                555                 :                :                     /* Signal the sync worker, as it may be waiting for us. */
                                556         [ +  - ]:            181 :                     if (syncworker->proc)
                                557                 :            181 :                         logicalrep_worker_wakeup_ptr(syncworker);
                                558                 :                : 
                                559                 :                :                     /* Now safe to release the LWLock */
                                560                 :            181 :                     LWLockRelease(LogicalRepWorkerLock);
                                561                 :                : 
  635 akapila@postgresql.o      562         [ +  - ]:            181 :                     if (started_tx)
                                563                 :                :                     {
                                564                 :                :                         /*
                                565                 :                :                          * We must commit the existing transaction to release
                                566                 :                :                          * the existing locks before entering a busy loop.
                                567                 :                :                          * This is required to avoid any undetected deadlocks
                                568                 :                :                          * due to any existing lock as deadlock detector won't
                                569                 :                :                          * be able to detect the waits on the latch.
                                570                 :                :                          *
                                571                 :                :                          * Also close any tables prior to the commit.
                                572                 :                :                          */
   36                           573         [ +  + ]:            181 :                         if (rel)
                                574                 :                :                         {
                                575                 :             28 :                             table_close(rel, NoLock);
                                576                 :             28 :                             rel = NULL;
                                577                 :                :                         }
  635                           578                 :            181 :                         CommitTransactionCommand();
                                579                 :            181 :                         pgstat_report_stat(false);
                                580                 :                :                     }
                                581                 :                : 
                                582                 :                :                     /*
                                583                 :                :                      * Enter busy loop and wait for synchronization worker to
                                584                 :                :                      * reach expected state (or die trying).
                                585                 :                :                      */
                                586                 :            181 :                     StartTransactionCommand();
                                587                 :            181 :                     started_tx = true;
                                588                 :                : 
 2990 tgl@sss.pgh.pa.us         589                 :            181 :                     wait_for_relation_state_change(rstate->relid,
                                590                 :                :                                                    SUBREL_STATE_SYNCDONE);
                                591                 :                :                 }
                                592                 :                :                 else
                                593                 :            559 :                     LWLockRelease(LogicalRepWorkerLock);
                                594                 :                :             }
                                595                 :                :             else
                                596                 :                :             {
                                597                 :                :                 /*
                                598                 :                :                  * If there is no sync worker for this table yet, count
                                599                 :                :                  * running sync workers for this subscription, while we have
                                600                 :                :                  * the lock.
                                601                 :                :                  */
                                602                 :                :                 int         nsyncworkers =
  841                           603                 :            903 :                     logicalrep_sync_worker_count(MyLogicalRepWorker->subid);
                                604                 :                : 
                                605                 :                :                 /* Now safe to release the LWLock */
 2990                           606                 :            903 :                 LWLockRelease(LogicalRepWorkerLock);
                                607                 :                : 
                                608                 :                :                 /*
                                609                 :                :                  * If there are free sync worker slot(s), start a new sync
                                610                 :                :                  * worker for the table.
                                611                 :                :                  */
                                612         [ +  + ]:            903 :                 if (nsyncworkers < max_sync_workers_per_subscription)
                                613                 :                :                 {
                                614                 :            203 :                     TimestampTz now = GetCurrentTimestamp();
                                615                 :                :                     struct tablesync_start_time_mapping *hentry;
                                616                 :                :                     bool        found;
                                617                 :                : 
                                618                 :            203 :                     hentry = hash_search(last_start_times, &rstate->relid,
                                619                 :                :                                          HASH_ENTER, &found);
                                620                 :                : 
                                621   [ +  +  +  + ]:            216 :                     if (!found ||
                                622                 :             13 :                         TimestampDifferenceExceeds(hentry->last_start_time, now,
                                623                 :                :                                                    wal_retrieve_retry_interval))
                                624                 :                :                     {
                                625                 :                :                         /*
                                626                 :                :                          * Set the last_start_time even if we fail to start
                                627                 :                :                          * the worker, so that we won't retry until
                                628                 :                :                          * wal_retrieve_retry_interval has elapsed.
                                629                 :                :                          */
                                630                 :            196 :                         hentry->last_start_time = now;
   74                           631                 :            196 :                         (void) logicalrep_worker_launch(WORKERTYPE_TABLESYNC,
                                632                 :            196 :                                                         MyLogicalRepWorker->dbid,
                                633                 :            196 :                                                         MySubscription->oid,
                                634                 :            196 :                                                         MySubscription->name,
                                635                 :            196 :                                                         MyLogicalRepWorker->userid,
                                636                 :                :                                                         rstate->relid,
                                637                 :                :                                                         DSM_HANDLE_INVALID,
                                638                 :                :                                                         false);
                                639                 :                :                     }
                                640                 :                :                 }
                                641                 :                :             }
                                642                 :                :         }
                                643                 :                :     }
                                644                 :                : 
                                645                 :                :     /* Close table if opened */
   36 akapila@postgresql.o      646         [ +  + ]:           3556 :     if (rel)
                                647                 :            153 :         table_close(rel, NoLock);
                                648                 :                : 
                                649                 :                : 
 3043 peter_e@gmx.net           650         [ +  + ]:           3556 :     if (started_tx)
                                651                 :                :     {
                                652                 :                :         /*
                                653                 :                :          * Even when the two_phase mode is requested by the user, it remains
                                654                 :                :          * as 'pending' until all tablesyncs have reached READY state.
                                655                 :                :          *
                                656                 :                :          * When this happens, we restart the apply worker and (if the
                                657                 :                :          * conditions are still ok) then the two_phase tri-state will become
                                658                 :                :          * 'enabled' at that time.
                                659                 :                :          *
                                660                 :                :          * Note: If the subscription has no tables then leave the state as
                                661                 :                :          * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
                                662                 :                :          * work.
                                663                 :                :          */
  974 tgl@sss.pgh.pa.us         664         [ +  + ]:            859 :         if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING)
                                665                 :                :         {
                                666                 :             25 :             CommandCounterIncrement();  /* make updates visible */
                                667         [ +  + ]:             25 :             if (AllTablesyncsReady())
                                668                 :                :             {
                                669         [ +  - ]:              6 :                 ereport(LOG,
                                670                 :                :                         (errmsg("logical replication apply worker for subscription \"%s\" will restart so that two_phase can be enabled",
                                671                 :                :                                 MySubscription->name)));
                                672                 :              6 :                 should_exit = true;
                                673                 :                :             }
                                674                 :                :         }
                                675                 :                : 
 3043 peter_e@gmx.net           676                 :            859 :         CommitTransactionCommand();
 1249 andres@anarazel.de        677                 :            859 :         pgstat_report_stat(true);
                                678                 :                :     }
                                679                 :                : 
  974 tgl@sss.pgh.pa.us         680         [ +  + ]:           3556 :     if (should_exit)
                                681                 :                :     {
                                682                 :                :         /*
                                683                 :                :          * Reset the last-start time for this worker so that the launcher will
                                684                 :                :          * restart it without waiting for wal_retrieve_retry_interval.
                                685                 :                :          */
  958                           686                 :              6 :         ApplyLauncherForgetWorkerStartTime(MySubscription->oid);
                                687                 :                : 
  974                           688                 :              6 :         proc_exit(0);
                                689                 :                :     }
 3089 peter_e@gmx.net           690                 :           3550 : }
                                691                 :                : 
                                692                 :                : /*
                                693                 :                :  * Process possible state change(s) of tables that are being synchronized.
                                694                 :                :  */
                                695                 :                : void
                                696                 :           3781 : process_syncing_tables(XLogRecPtr current_lsn)
                                697                 :                : {
  746 akapila@postgresql.o      698   [ +  +  +  -  :           3781 :     switch (MyLogicalRepWorker->type)
                                                 - ]
                                699                 :                :     {
                                700                 :             22 :         case WORKERTYPE_PARALLEL_APPLY:
                                701                 :                : 
                                702                 :                :             /*
                                703                 :                :              * Skip for parallel apply workers because they only operate on
                                704                 :                :              * tables that are in a READY state. See pa_can_start() and
                                705                 :                :              * should_apply_changes_for_rel().
                                706                 :                :              */
                                707                 :             22 :             break;
                                708                 :                : 
                                709                 :            200 :         case WORKERTYPE_TABLESYNC:
                                710                 :            200 :             process_syncing_tables_for_sync(current_lsn);
                                711                 :             17 :             break;
                                712                 :                : 
                                713                 :           3559 :         case WORKERTYPE_APPLY:
                                714                 :           3559 :             process_syncing_tables_for_apply(current_lsn);
                                715                 :           3550 :             break;
                                716                 :                : 
  746 akapila@postgresql.o      717                 :UBC           0 :         case WORKERTYPE_UNKNOWN:
                                718                 :                :             /* Should never happen. */
                                719         [ #  # ]:              0 :             elog(ERROR, "Unknown worker type");
                                720                 :                :     }
 3089 peter_e@gmx.net           721                 :CBC        3589 : }
                                722                 :                : 
                                723                 :                : /*
                                724                 :                :  * Create list of columns for COPY based on logical relation mapping.
                                725                 :                :  */
                                726                 :                : static List *
                                727                 :            192 : make_copy_attnamelist(LogicalRepRelMapEntry *rel)
                                728                 :                : {
                                729                 :            192 :     List       *attnamelist = NIL;
                                730                 :                :     int         i;
                                731                 :                : 
 3033                           732         [ +  + ]:            515 :     for (i = 0; i < rel->remoterel.natts; i++)
                                733                 :                :     {
 3089                           734                 :            323 :         attnamelist = lappend(attnamelist,
 3033                           735                 :            323 :                               makeString(rel->remoterel.attnames[i]));
                                736                 :                :     }
                                737                 :                : 
                                738                 :                : 
 3089                           739                 :            192 :     return attnamelist;
                                740                 :                : }
                                741                 :                : 
                                742                 :                : /*
                                743                 :                :  * Data source callback for the COPY FROM, which reads from the remote
                                744                 :                :  * connection and passes the data back to our local COPY.
                                745                 :                :  */
                                746                 :                : static int
                                747                 :          13984 : copy_read_data(void *outbuf, int minread, int maxread)
                                748                 :                : {
 3034 bruce@momjian.us          749                 :          13984 :     int         bytesread = 0;
                                750                 :                :     int         avail;
                                751                 :                : 
                                752                 :                :     /* If there are some leftover data from previous read, use it. */
 3089 peter_e@gmx.net           753                 :          13984 :     avail = copybuf->len - copybuf->cursor;
                                754         [ -  + ]:          13984 :     if (avail)
                                755                 :                :     {
 3089 peter_e@gmx.net           756         [ #  # ]:UBC           0 :         if (avail > maxread)
                                757                 :              0 :             avail = maxread;
                                758                 :              0 :         memcpy(outbuf, &copybuf->data[copybuf->cursor], avail);
                                759                 :              0 :         copybuf->cursor += avail;
                                760                 :              0 :         maxread -= avail;
                                761                 :              0 :         bytesread += avail;
                                762                 :                :     }
                                763                 :                : 
 3018 peter_e@gmx.net           764   [ +  -  +  - ]:CBC       13984 :     while (maxread > 0 && bytesread < minread)
                                765                 :                :     {
 3089                           766                 :          13984 :         pgsocket    fd = PGINVALID_SOCKET;
                                767                 :                :         int         len;
                                768                 :          13984 :         char       *buf = NULL;
                                769                 :                : 
                                770                 :                :         for (;;)
                                771                 :                :         {
                                772                 :                :             /* Try read the data. */
 1578 alvherre@alvh.no-ip.      773                 :          13984 :             len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
                                774                 :                : 
 3089 peter_e@gmx.net           775         [ -  + ]:          13984 :             CHECK_FOR_INTERRUPTS();
                                776                 :                : 
                                777         [ -  + ]:          13984 :             if (len == 0)
 3089 peter_e@gmx.net           778                 :UBC           0 :                 break;
 3089 peter_e@gmx.net           779         [ +  + ]:CBC       13984 :             else if (len < 0)
                                780                 :          13984 :                 return bytesread;
                                781                 :                :             else
                                782                 :                :             {
                                783                 :                :                 /* Process the data */
                                784                 :          13794 :                 copybuf->data = buf;
                                785                 :          13794 :                 copybuf->len = len;
                                786                 :          13794 :                 copybuf->cursor = 0;
                                787                 :                : 
                                788                 :          13794 :                 avail = copybuf->len - copybuf->cursor;
                                789         [ -  + ]:          13794 :                 if (avail > maxread)
 3089 peter_e@gmx.net           790                 :UBC           0 :                     avail = maxread;
 3089 peter_e@gmx.net           791                 :CBC       13794 :                 memcpy(outbuf, &copybuf->data[copybuf->cursor], avail);
  282 peter@eisentraut.org      792                 :          13794 :                 outbuf = (char *) outbuf + avail;
 3089 peter_e@gmx.net           793                 :          13794 :                 copybuf->cursor += avail;
                                794                 :          13794 :                 maxread -= avail;
                                795                 :          13794 :                 bytesread += avail;
                                796                 :                :             }
                                797                 :                : 
                                798   [ +  -  +  - ]:          13794 :             if (maxread <= 0 || bytesread >= minread)
                                799                 :          13794 :                 return bytesread;
                                800                 :                :         }
                                801                 :                : 
                                802                 :                :         /*
                                803                 :                :          * Wait for more data or latch.
                                804                 :                :          */
 2479 tmunro@postgresql.or      805                 :UBC           0 :         (void) WaitLatchOrSocket(MyLatch,
                                806                 :                :                                  WL_SOCKET_READABLE | WL_LATCH_SET |
                                807                 :                :                                  WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
                                808                 :                :                                  fd, 1000L, WAIT_EVENT_LOGICAL_SYNC_DATA);
                                809                 :                : 
 3014 andres@anarazel.de        810                 :              0 :         ResetLatch(MyLatch);
                                811                 :                :     }
                                812                 :                : 
 3089 peter_e@gmx.net           813                 :              0 :     return bytesread;
                                814                 :                : }
                                815                 :                : 
                                816                 :                : 
                                817                 :                : /*
                                818                 :                :  * Get information about remote relation in similar fashion the RELATION
                                819                 :                :  * message provides during replication.
                                820                 :                :  *
                                821                 :                :  * This function also returns (a) the relation qualifications to be used in
                                822                 :                :  * the COPY command, and (b) whether the remote relation has published any
                                823                 :                :  * generated column.
                                824                 :                :  */
                                825                 :                : static void
  311 akapila@postgresql.o      826                 :CBC         194 : fetch_remote_table_info(char *nspname, char *relname, LogicalRepRelation *lrel,
                                827                 :                :                         List **qual, bool *gencol_published)
                                828                 :                : {
                                829                 :                :     WalRcvExecResult *res;
                                830                 :                :     StringInfoData cmd;
                                831                 :                :     TupleTableSlot *slot;
 1997 peter@eisentraut.org      832                 :            194 :     Oid         tableRow[] = {OIDOID, CHAROID, CHAROID};
  311 akapila@postgresql.o      833                 :            194 :     Oid         attrRow[] = {INT2OID, TEXTOID, OIDOID, BOOLOID, BOOLOID};
 1292                           834                 :            194 :     Oid         qualRow[] = {TEXTOID};
                                835                 :                :     bool        isnull;
                                836                 :                :     int         natt;
  316 michael@paquier.xyz       837                 :            194 :     StringInfo  pub_names = NULL;
 1260 tomas.vondra@postgre      838                 :            194 :     Bitmapset  *included_cols = NULL;
  311 akapila@postgresql.o      839                 :            194 :     int         server_version = walrcv_server_version(LogRepWorkerWalRcvConn);
                                840                 :                : 
 3089 peter_e@gmx.net           841                 :            194 :     lrel->nspname = nspname;
                                842                 :            194 :     lrel->relname = relname;
                                843                 :                : 
                                844                 :                :     /* First fetch Oid and replica identity. */
                                845                 :            194 :     initStringInfo(&cmd);
 1997 peter@eisentraut.org      846                 :            194 :     appendStringInfo(&cmd, "SELECT c.oid, c.relreplident, c.relkind"
                                847                 :                :                      "  FROM pg_catalog.pg_class c"
                                848                 :                :                      "  INNER JOIN pg_catalog.pg_namespace n"
                                849                 :                :                      "        ON (c.relnamespace = n.oid)"
                                850                 :                :                      " WHERE n.nspname = %s"
                                851                 :                :                      "   AND c.relname = %s",
                                852                 :                :                      quote_literal_cstr(nspname),
                                853                 :                :                      quote_literal_cstr(relname));
 1578 alvherre@alvh.no-ip.      854                 :            194 :     res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
                                855                 :                :                       lengthof(tableRow), tableRow);
                                856                 :                : 
 3089 peter_e@gmx.net           857         [ -  + ]:            194 :     if (res->status != WALRCV_OK_TUPLES)
 3089 peter_e@gmx.net           858         [ #  # ]:UBC           0 :         ereport(ERROR,
                                859                 :                :                 (errcode(ERRCODE_CONNECTION_FAILURE),
                                860                 :                :                  errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s",
                                861                 :                :                         nspname, relname, res->err)));
                                862                 :                : 
 2487 andres@anarazel.de        863                 :CBC         194 :     slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
 3089 peter_e@gmx.net           864         [ -  + ]:            194 :     if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot))
 3089 peter_e@gmx.net           865         [ #  # ]:UBC           0 :         ereport(ERROR,
                                866                 :                :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
                                867                 :                :                  errmsg("table \"%s.%s\" not found on publisher",
                                868                 :                :                         nspname, relname)));
                                869                 :                : 
 3089 peter_e@gmx.net           870                 :CBC         194 :     lrel->remoteid = DatumGetObjectId(slot_getattr(slot, 1, &isnull));
                                871         [ -  + ]:            194 :     Assert(!isnull);
                                872                 :            194 :     lrel->replident = DatumGetChar(slot_getattr(slot, 2, &isnull));
                                873         [ -  + ]:            194 :     Assert(!isnull);
 1997 peter@eisentraut.org      874                 :            194 :     lrel->relkind = DatumGetChar(slot_getattr(slot, 3, &isnull));
                                875         [ -  + ]:            194 :     Assert(!isnull);
                                876                 :                : 
 3089 peter_e@gmx.net           877                 :            194 :     ExecDropSingleTupleTableSlot(slot);
                                878                 :            194 :     walrcv_clear_result(res);
                                879                 :                : 
                                880                 :                : 
                                881                 :                :     /*
                                882                 :                :      * Get column lists for each relation.
                                883                 :                :      *
                                884                 :                :      * We need to do this before fetching info about column names and types,
                                885                 :                :      * so that we can skip columns that should not be replicated.
                                886                 :                :      */
  311 akapila@postgresql.o      887         [ +  - ]:            194 :     if (server_version >= 150000)
                                888                 :                :     {
                                889                 :                :         WalRcvExecResult *pubres;
                                890                 :                :         TupleTableSlot *tslot;
 1192                           891                 :            194 :         Oid         attrsRow[] = {INT2VECTOROID};
                                892                 :                : 
                                893                 :                :         /* Build the pub_names comma-separated string. */
  316 michael@paquier.xyz       894                 :            194 :         pub_names = makeStringInfo();
                                895                 :            194 :         GetPublicationsStr(MySubscription->publications, pub_names, true);
                                896                 :                : 
                                897                 :                :         /*
                                898                 :                :          * Fetch info about column lists for the relation (from all the
                                899                 :                :          * publications).
                                900                 :                :          */
 1260 tomas.vondra@postgre      901                 :            194 :         resetStringInfo(&cmd);
                                902                 :            194 :         appendStringInfo(&cmd,
                                903                 :                :                          "SELECT DISTINCT"
                                904                 :                :                          "  (CASE WHEN (array_length(gpt.attrs, 1) = c.relnatts)"
                                905                 :                :                          "   THEN NULL ELSE gpt.attrs END)"
                                906                 :                :                          "  FROM pg_publication p,"
                                907                 :                :                          "  LATERAL pg_get_publication_tables(p.pubname) gpt,"
                                908                 :                :                          "  pg_class c"
                                909                 :                :                          " WHERE gpt.relid = %u AND c.oid = gpt.relid"
                                910                 :                :                          "   AND p.pubname IN ( %s )",
                                911                 :                :                          lrel->remoteid,
                                912                 :                :                          pub_names->data);
                                913                 :                : 
                                914                 :            194 :         pubres = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
                                915                 :                :                              lengthof(attrsRow), attrsRow);
                                916                 :                : 
                                917         [ -  + ]:            194 :         if (pubres->status != WALRCV_OK_TUPLES)
 1260 tomas.vondra@postgre      918         [ #  # ]:UBC           0 :             ereport(ERROR,
                                919                 :                :                     (errcode(ERRCODE_CONNECTION_FAILURE),
                                920                 :                :                      errmsg("could not fetch column list info for table \"%s.%s\" from publisher: %s",
                                921                 :                :                             nspname, relname, pubres->err)));
                                922                 :                : 
                                923                 :                :         /*
                                924                 :                :          * We don't support the case where the column list is different for
                                925                 :                :          * the same table when combining publications. See comments atop
                                926                 :                :          * fetch_table_list. So there should be only one row returned.
                                927                 :                :          * Although we already checked this when creating the subscription, we
                                928                 :                :          * still need to check here in case the column list was changed after
                                929                 :                :          * creating the subscription and before the sync worker is started.
                                930                 :                :          */
 1192 akapila@postgresql.o      931         [ -  + ]:CBC         194 :         if (tuplestore_tuple_count(pubres->tuplestore) > 1)
 1192 akapila@postgresql.o      932         [ #  # ]:UBC           0 :             ereport(ERROR,
                                933                 :                :                     errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                                934                 :                :                     errmsg("cannot use different column lists for table \"%s.%s\" in different publications",
                                935                 :                :                            nspname, relname));
                                936                 :                : 
                                937                 :                :         /*
                                938                 :                :          * Get the column list and build a single bitmap with the attnums.
                                939                 :                :          *
                                940                 :                :          * If we find a NULL value, it means all the columns should be
                                941                 :                :          * replicated.
                                942                 :                :          */
 1113 drowley@postgresql.o      943                 :CBC         194 :         tslot = MakeSingleTupleTableSlot(pubres->tupledesc, &TTSOpsMinimalTuple);
                                944         [ +  - ]:            194 :         if (tuplestore_gettupleslot(pubres->tuplestore, true, false, tslot))
                                945                 :                :         {
                                946                 :            194 :             Datum       cfval = slot_getattr(tslot, 1, &isnull);
                                947                 :                : 
 1192 akapila@postgresql.o      948         [ +  + ]:            194 :             if (!isnull)
                                949                 :                :             {
                                950                 :                :                 ArrayType  *arr;
                                951                 :                :                 int         nelems;
                                952                 :                :                 int16      *elems;
                                953                 :                : 
                                954                 :             22 :                 arr = DatumGetArrayTypeP(cfval);
                                955                 :             22 :                 nelems = ARR_DIMS(arr)[0];
                                956         [ -  + ]:             22 :                 elems = (int16 *) ARR_DATA_PTR(arr);
                                957                 :                : 
                                958         [ +  + ]:             59 :                 for (natt = 0; natt < nelems; natt++)
                                959                 :             37 :                     included_cols = bms_add_member(included_cols, elems[natt]);
                                960                 :                :             }
                                961                 :                : 
 1113 drowley@postgresql.o      962                 :            194 :             ExecClearTuple(tslot);
                                963                 :                :         }
                                964                 :            194 :         ExecDropSingleTupleTableSlot(tslot);
                                965                 :                : 
 1260 tomas.vondra@postgre      966                 :            194 :         walrcv_clear_result(pubres);
                                967                 :                :     }
                                968                 :                : 
                                969                 :                :     /*
                                970                 :                :      * Now fetch column names and types.
                                971                 :                :      */
 3089 peter_e@gmx.net           972                 :            194 :     resetStringInfo(&cmd);
  148 drowley@postgresql.o      973                 :            194 :     appendStringInfoString(&cmd,
                                974                 :                :                            "SELECT a.attnum,"
                                975                 :                :                            "       a.attname,"
                                976                 :                :                            "       a.atttypid,"
                                977                 :                :                            "       a.attnum = ANY(i.indkey)");
                                978                 :                : 
                                979                 :                :     /* Generated columns can be replicated since version 18. */
  311 akapila@postgresql.o      980         [ +  - ]:            194 :     if (server_version >= 180000)
  148 drowley@postgresql.o      981                 :            194 :         appendStringInfoString(&cmd, ", a.attgenerated != ''");
                                982                 :                : 
  311 akapila@postgresql.o      983         [ +  - ]:            388 :     appendStringInfo(&cmd,
                                984                 :                :                      "  FROM pg_catalog.pg_attribute a"
                                985                 :                :                      "  LEFT JOIN pg_catalog.pg_index i"
                                986                 :                :                      "       ON (i.indexrelid = pg_get_replica_identity_index(%u))"
                                987                 :                :                      " WHERE a.attnum > 0::pg_catalog.int2"
                                988                 :                :                      "   AND NOT a.attisdropped %s"
                                989                 :                :                      "   AND a.attrelid = %u"
                                990                 :                :                      " ORDER BY a.attnum",
                                991                 :                :                      lrel->remoteid,
                                992         [ -  + ]:            194 :                      (server_version >= 120000 && server_version < 180000 ?
                                993                 :                :                       "AND a.attgenerated = ''" : ""),
                                994                 :                :                      lrel->remoteid);
 1578 alvherre@alvh.no-ip.      995         [ +  - ]:            194 :     res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
                                996                 :                :                       server_version >= 180000 ? lengthof(attrRow) : lengthof(attrRow) - 1, attrRow);
                                997                 :                : 
 3089 peter_e@gmx.net           998         [ -  + ]:            194 :     if (res->status != WALRCV_OK_TUPLES)
 3089 peter_e@gmx.net           999         [ #  # ]:UBC           0 :         ereport(ERROR,
                               1000                 :                :                 (errcode(ERRCODE_CONNECTION_FAILURE),
                               1001                 :                :                  errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s",
                               1002                 :                :                         nspname, relname, res->err)));
                               1003                 :                : 
                               1004                 :                :     /* We don't know the number of rows coming, so allocate enough space. */
 3089 peter_e@gmx.net          1005                 :CBC         194 :     lrel->attnames = palloc0(MaxTupleAttributeNumber * sizeof(char *));
                               1006                 :            194 :     lrel->atttyps = palloc0(MaxTupleAttributeNumber * sizeof(Oid));
                               1007                 :            194 :     lrel->attkeys = NULL;
                               1008                 :                : 
                               1009                 :                :     /*
                               1010                 :                :      * Store the columns as a list of names.  Ignore those that are not
                               1011                 :                :      * present in the column list, if there is one.
                               1012                 :                :      */
                               1013                 :            194 :     natt = 0;
 2487 andres@anarazel.de       1014                 :            194 :     slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
 3089 peter_e@gmx.net          1015         [ +  + ]:            554 :     while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
                               1016                 :                :     {
                               1017                 :                :         char       *rel_colname;
                               1018                 :                :         AttrNumber  attnum;
                               1019                 :                : 
 1260 tomas.vondra@postgre     1020                 :            360 :         attnum = DatumGetInt16(slot_getattr(slot, 1, &isnull));
                               1021         [ -  + ]:            360 :         Assert(!isnull);
                               1022                 :                : 
                               1023                 :                :         /* If the column is not in the column list, skip it. */
                               1024   [ +  +  +  + ]:            360 :         if (included_cols != NULL && !bms_is_member(attnum, included_cols))
                               1025                 :                :         {
                               1026                 :             31 :             ExecClearTuple(slot);
                               1027                 :             31 :             continue;
                               1028                 :                :         }
                               1029                 :                : 
                               1030                 :            329 :         rel_colname = TextDatumGetCString(slot_getattr(slot, 2, &isnull));
 3089 peter_e@gmx.net          1031         [ -  + ]:            329 :         Assert(!isnull);
                               1032                 :                : 
 1260 tomas.vondra@postgre     1033                 :            329 :         lrel->attnames[natt] = rel_colname;
                               1034                 :            329 :         lrel->atttyps[natt] = DatumGetObjectId(slot_getattr(slot, 3, &isnull));
 3089 peter_e@gmx.net          1035         [ -  + ]:            329 :         Assert(!isnull);
                               1036                 :                : 
 1260 tomas.vondra@postgre     1037         [ +  + ]:            329 :         if (DatumGetBool(slot_getattr(slot, 4, &isnull)))
 3089 peter_e@gmx.net          1038                 :            108 :             lrel->attkeys = bms_add_member(lrel->attkeys, natt);
                               1039                 :                : 
                               1040                 :                :         /* Remember if the remote table has published any generated column. */
  311 akapila@postgresql.o     1041   [ +  -  +  - ]:            329 :         if (server_version >= 180000 && !(*gencol_published))
                               1042                 :                :         {
                               1043                 :            329 :             *gencol_published = DatumGetBool(slot_getattr(slot, 5, &isnull));
                               1044         [ -  + ]:            329 :             Assert(!isnull);
                               1045                 :                :         }
                               1046                 :                : 
                               1047                 :                :         /* Should never happen. */
 3089 peter_e@gmx.net          1048         [ -  + ]:            329 :         if (++natt >= MaxTupleAttributeNumber)
 3089 peter_e@gmx.net          1049         [ #  # ]:UBC           0 :             elog(ERROR, "too many columns in remote table \"%s.%s\"",
                               1050                 :                :                  nspname, relname);
                               1051                 :                : 
 3089 peter_e@gmx.net          1052                 :CBC         329 :         ExecClearTuple(slot);
                               1053                 :                :     }
                               1054                 :            194 :     ExecDropSingleTupleTableSlot(slot);
                               1055                 :                : 
                               1056                 :            194 :     lrel->natts = natt;
                               1057                 :                : 
                               1058                 :            194 :     walrcv_clear_result(res);
                               1059                 :                : 
                               1060                 :                :     /*
                               1061                 :                :      * Get relation's row filter expressions. DISTINCT avoids the same
                               1062                 :                :      * expression of a table in multiple publications from being included
                               1063                 :                :      * multiple times in the final expression.
                               1064                 :                :      *
                               1065                 :                :      * We need to copy the row even if it matches just one of the
                               1066                 :                :      * publications, so we later combine all the quals with OR.
                               1067                 :                :      *
                               1068                 :                :      * For initial synchronization, row filtering can be ignored in following
                               1069                 :                :      * cases:
                               1070                 :                :      *
                               1071                 :                :      * 1) one of the subscribed publications for the table hasn't specified
                               1072                 :                :      * any row filter
                               1073                 :                :      *
                               1074                 :                :      * 2) one of the subscribed publications has puballtables set to true
                               1075                 :                :      *
                               1076                 :                :      * 3) one of the subscribed publications is declared as TABLES IN SCHEMA
                               1077                 :                :      * that includes this relation
                               1078                 :                :      */
  311 akapila@postgresql.o     1079         [ +  - ]:            194 :     if (server_version >= 150000)
                               1080                 :                :     {
                               1081                 :                :         /* Reuse the already-built pub_names. */
  316 michael@paquier.xyz      1082         [ -  + ]:            194 :         Assert(pub_names != NULL);
                               1083                 :                : 
                               1084                 :                :         /* Check for row filters. */
 1292 akapila@postgresql.o     1085                 :            194 :         resetStringInfo(&cmd);
                               1086                 :            194 :         appendStringInfo(&cmd,
                               1087                 :                :                          "SELECT DISTINCT pg_get_expr(gpt.qual, gpt.relid)"
                               1088                 :                :                          "  FROM pg_publication p,"
                               1089                 :                :                          "  LATERAL pg_get_publication_tables(p.pubname) gpt"
                               1090                 :                :                          " WHERE gpt.relid = %u"
                               1091                 :                :                          "   AND p.pubname IN ( %s )",
                               1092                 :                :                          lrel->remoteid,
                               1093                 :                :                          pub_names->data);
                               1094                 :                : 
                               1095                 :            194 :         res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 1, qualRow);
                               1096                 :                : 
                               1097         [ -  + ]:            194 :         if (res->status != WALRCV_OK_TUPLES)
 1292 akapila@postgresql.o     1098         [ #  # ]:UBC           0 :             ereport(ERROR,
                               1099                 :                :                     (errmsg("could not fetch table WHERE clause info for table \"%s.%s\" from publisher: %s",
                               1100                 :                :                             nspname, relname, res->err)));
                               1101                 :                : 
                               1102                 :                :         /*
                               1103                 :                :          * Multiple row filter expressions for the same table will be combined
                               1104                 :                :          * by COPY using OR. If any of the filter expressions for this table
                               1105                 :                :          * are null, it means the whole table will be copied. In this case it
                               1106                 :                :          * is not necessary to construct a unified row filter expression at
                               1107                 :                :          * all.
                               1108                 :                :          */
 1292 akapila@postgresql.o     1109                 :CBC         194 :         slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
                               1110         [ +  + ]:            209 :         while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
                               1111                 :                :         {
                               1112                 :            198 :             Datum       rf = slot_getattr(slot, 1, &isnull);
                               1113                 :                : 
                               1114         [ +  + ]:            198 :             if (!isnull)
                               1115                 :             15 :                 *qual = lappend(*qual, makeString(TextDatumGetCString(rf)));
                               1116                 :                :             else
                               1117                 :                :             {
                               1118                 :                :                 /* Ignore filters and cleanup as necessary. */
                               1119         [ +  + ]:            183 :                 if (*qual)
                               1120                 :                :                 {
                               1121                 :              3 :                     list_free_deep(*qual);
                               1122                 :              3 :                     *qual = NIL;
                               1123                 :                :                 }
                               1124                 :            183 :                 break;
                               1125                 :                :             }
                               1126                 :                : 
                               1127                 :             15 :             ExecClearTuple(slot);
                               1128                 :                :         }
                               1129                 :            194 :         ExecDropSingleTupleTableSlot(slot);
                               1130                 :                : 
                               1131                 :            194 :         walrcv_clear_result(res);
  316 michael@paquier.xyz      1132                 :            194 :         destroyStringInfo(pub_names);
                               1133                 :                :     }
                               1134                 :                : 
 3089 peter_e@gmx.net          1135                 :            194 :     pfree(cmd.data);
                               1136                 :            194 : }
                               1137                 :                : 
                               1138                 :                : /*
                               1139                 :                :  * Copy existing data of a table from publisher.
                               1140                 :                :  *
                               1141                 :                :  * Caller is responsible for locking the local relation.
                               1142                 :                :  */
                               1143                 :                : static void
                               1144                 :            194 : copy_table(Relation rel)
                               1145                 :                : {
                               1146                 :                :     LogicalRepRelMapEntry *relmapentry;
                               1147                 :                :     LogicalRepRelation lrel;
 1292 akapila@postgresql.o     1148                 :            194 :     List       *qual = NIL;
                               1149                 :                :     WalRcvExecResult *res;
                               1150                 :                :     StringInfoData cmd;
                               1151                 :                :     CopyFromState cstate;
                               1152                 :                :     List       *attnamelist;
                               1153                 :                :     ParseState *pstate;
  898                          1154                 :            194 :     List       *options = NIL;
  311                          1155                 :            194 :     bool        gencol_published = false;
                               1156                 :                : 
                               1157                 :                :     /* Get the publisher relation info. */
 3089 peter_e@gmx.net          1158                 :            194 :     fetch_remote_table_info(get_namespace_name(RelationGetNamespace(rel)),
  311 akapila@postgresql.o     1159                 :            194 :                             RelationGetRelationName(rel), &lrel, &qual,
                               1160                 :                :                             &gencol_published);
                               1161                 :                : 
                               1162                 :                :     /* Put the relation into relmap. */
 3089 peter_e@gmx.net          1163                 :            194 :     logicalrep_relmap_update(&lrel);
                               1164                 :                : 
                               1165                 :                :     /* Map the publisher relation to local one. */
                               1166                 :            194 :     relmapentry = logicalrep_rel_open(lrel.remoteid, NoLock);
                               1167         [ -  + ]:            192 :     Assert(rel == relmapentry->localrel);
                               1168                 :                : 
                               1169                 :                :     /* Start copy on the publisher. */
                               1170                 :            192 :     initStringInfo(&cmd);
                               1171                 :                : 
                               1172                 :                :     /* Regular table with no row filter or generated columns */
  311 akapila@postgresql.o     1173   [ +  +  +  +  :            192 :     if (lrel.relkind == RELKIND_RELATION && qual == NIL && !gencol_published)
                                              +  + ]
                               1174                 :                :     {
  654                          1175                 :            164 :         appendStringInfo(&cmd, "COPY %s",
 1997 peter@eisentraut.org     1176                 :            164 :                          quote_qualified_identifier(lrel.nspname, lrel.relname));
                               1177                 :                : 
                               1178                 :                :         /* If the table has columns, then specify the columns */
  654 akapila@postgresql.o     1179         [ +  + ]:            164 :         if (lrel.natts)
                               1180                 :                :         {
                               1181                 :            163 :             appendStringInfoString(&cmd, " (");
                               1182                 :                : 
                               1183                 :                :             /*
                               1184                 :                :              * XXX Do we need to list the columns in all cases? Maybe we're
                               1185                 :                :              * replicating all columns?
                               1186                 :                :              */
                               1187         [ +  + ]:            444 :             for (int i = 0; i < lrel.natts; i++)
                               1188                 :                :             {
                               1189         [ +  + ]:            281 :                 if (i > 0)
                               1190                 :            118 :                     appendStringInfoString(&cmd, ", ");
                               1191                 :                : 
                               1192                 :            281 :                 appendStringInfoString(&cmd, quote_identifier(lrel.attnames[i]));
                               1193                 :                :             }
                               1194                 :                : 
  514 drowley@postgresql.o     1195                 :            163 :             appendStringInfoChar(&cmd, ')');
                               1196                 :                :         }
                               1197                 :                : 
  654 akapila@postgresql.o     1198                 :            164 :         appendStringInfoString(&cmd, " TO STDOUT");
                               1199                 :                :     }
                               1200                 :                :     else
                               1201                 :                :     {
                               1202                 :                :         /*
                               1203                 :                :          * For non-tables and tables with row filters, we need to do COPY
                               1204                 :                :          * (SELECT ...), but we can't just do SELECT * because we may need to
                               1205                 :                :          * copy only subset of columns including generated columns. For tables
                               1206                 :                :          * with any row filters, build a SELECT query with OR'ed row filters
                               1207                 :                :          * for COPY.
                               1208                 :                :          *
                               1209                 :                :          * We also need to use this same COPY (SELECT ...) syntax when
                               1210                 :                :          * generated columns are published, because copy of generated columns
                               1211                 :                :          * is not supported by the normal COPY.
                               1212                 :                :          */
 1787 drowley@postgresql.o     1213                 :             28 :         appendStringInfoString(&cmd, "COPY (SELECT ");
 1997 peter@eisentraut.org     1214         [ +  + ]:             70 :         for (int i = 0; i < lrel.natts; i++)
                               1215                 :                :         {
                               1216                 :             42 :             appendStringInfoString(&cmd, quote_identifier(lrel.attnames[i]));
                               1217         [ +  + ]:             42 :             if (i < lrel.natts - 1)
                               1218                 :             14 :                 appendStringInfoString(&cmd, ", ");
                               1219                 :                :         }
                               1220                 :                : 
 1292 akapila@postgresql.o     1221                 :             28 :         appendStringInfoString(&cmd, " FROM ");
                               1222                 :                : 
                               1223                 :                :         /*
                               1224                 :                :          * For regular tables, make sure we don't copy data from a child that
                               1225                 :                :          * inherits the named table as those will be copied separately.
                               1226                 :                :          */
                               1227         [ +  + ]:             28 :         if (lrel.relkind == RELKIND_RELATION)
                               1228                 :             11 :             appendStringInfoString(&cmd, "ONLY ");
                               1229                 :                : 
                               1230                 :             28 :         appendStringInfoString(&cmd, quote_qualified_identifier(lrel.nspname, lrel.relname));
                               1231                 :                :         /* list of OR'ed filters */
                               1232         [ +  + ]:             28 :         if (qual != NIL)
                               1233                 :                :         {
                               1234                 :                :             ListCell   *lc;
                               1235                 :             11 :             char       *q = strVal(linitial(qual));
                               1236                 :                : 
                               1237                 :             11 :             appendStringInfo(&cmd, " WHERE %s", q);
                               1238   [ +  -  +  +  :             12 :             for_each_from(lc, qual, 1)
                                              +  + ]
                               1239                 :                :             {
                               1240                 :              1 :                 q = strVal(lfirst(lc));
                               1241                 :              1 :                 appendStringInfo(&cmd, " OR %s", q);
                               1242                 :                :             }
                               1243                 :             11 :             list_free_deep(qual);
                               1244                 :                :         }
                               1245                 :                : 
                               1246                 :             28 :         appendStringInfoString(&cmd, ") TO STDOUT");
                               1247                 :                :     }
                               1248                 :                : 
                               1249                 :                :     /*
                               1250                 :                :      * Prior to v16, initial table synchronization will use text format even
                               1251                 :                :      * if the binary option is enabled for a subscription.
                               1252                 :                :      */
  898                          1253         [ +  - ]:            192 :     if (walrcv_server_version(LogRepWorkerWalRcvConn) >= 160000 &&
                               1254         [ +  + ]:            192 :         MySubscription->binary)
                               1255                 :                :     {
                               1256                 :              5 :         appendStringInfoString(&cmd, " WITH (FORMAT binary)");
                               1257                 :              5 :         options = list_make1(makeDefElem("format",
                               1258                 :                :                                          (Node *) makeString("binary"), -1));
                               1259                 :                :     }
                               1260                 :                : 
 1578 alvherre@alvh.no-ip.     1261                 :            192 :     res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 0, NULL);
 3089 peter_e@gmx.net          1262                 :            192 :     pfree(cmd.data);
                               1263         [ -  + ]:            192 :     if (res->status != WALRCV_OK_COPY_OUT)
 3089 peter_e@gmx.net          1264         [ #  # ]:UBC           0 :         ereport(ERROR,
                               1265                 :                :                 (errcode(ERRCODE_CONNECTION_FAILURE),
                               1266                 :                :                  errmsg("could not start initial contents copy for table \"%s.%s\": %s",
                               1267                 :                :                         lrel.nspname, lrel.relname, res->err)));
 3089 peter_e@gmx.net          1268                 :CBC         192 :     walrcv_clear_result(res);
                               1269                 :                : 
                               1270                 :            192 :     copybuf = makeStringInfo();
                               1271                 :                : 
 3064                          1272                 :            192 :     pstate = make_parsestate(NULL);
 2074 tgl@sss.pgh.pa.us        1273                 :            192 :     (void) addRangeTableEntryForRelation(pstate, rel, AccessShareLock,
                               1274                 :                :                                          NULL, false, false);
                               1275                 :                : 
 3089 peter_e@gmx.net          1276                 :            192 :     attnamelist = make_copy_attnamelist(relmapentry);
  898 akapila@postgresql.o     1277                 :            192 :     cstate = BeginCopyFrom(pstate, rel, NULL, NULL, false, copy_read_data, attnamelist, options);
                               1278                 :                : 
                               1279                 :                :     /* Do the copy */
 3089 peter_e@gmx.net          1280                 :            191 :     (void) CopyFrom(cstate);
                               1281                 :                : 
                               1282                 :            183 :     logicalrep_rel_close(relmapentry, NoLock);
                               1283                 :            183 : }
                               1284                 :                : 
                               1285                 :                : /*
                               1286                 :                :  * Determine the tablesync slot name.
                               1287                 :                :  *
                               1288                 :                :  * The name must not exceed NAMEDATALEN - 1 because of remote node constraints
                               1289                 :                :  * on slot name length. We append system_identifier to avoid slot_name
                               1290                 :                :  * collision with subscriptions in other clusters. With the current scheme
                               1291                 :                :  * pg_%u_sync_%u_UINT64_FORMAT (3 + 10 + 6 + 10 + 20 + '\0'), the maximum
                               1292                 :                :  * length of slot_name will be 50.
                               1293                 :                :  *
                               1294                 :                :  * The returned slot name is stored in the supplied buffer (syncslotname) with
                               1295                 :                :  * the given size.
                               1296                 :                :  *
                               1297                 :                :  * Note: We don't use the subscription slot name as part of tablesync slot name
                               1298                 :                :  * because we are responsible for cleaning up these slots and it could become
                               1299                 :                :  * impossible to recalculate what name to cleanup if the subscription slot name
                               1300                 :                :  * had changed.
                               1301                 :                :  */
                               1302                 :                : void
 1667 akapila@postgresql.o     1303                 :            383 : ReplicationSlotNameForTablesync(Oid suboid, Oid relid,
                               1304                 :                :                                 char *syncslotname, Size szslot)
                               1305                 :                : {
 1664                          1306                 :            383 :     snprintf(syncslotname, szslot, "pg_%u_sync_%u_" UINT64_FORMAT, suboid,
                               1307                 :                :              relid, GetSystemIdentifier());
 1667                          1308                 :            383 : }
                               1309                 :                : 
                               1310                 :                : /*
                               1311                 :                :  * Start syncing the table in the sync worker.
                               1312                 :                :  *
                               1313                 :                :  * If nothing needs to be done to sync the table, we exit the worker without
                               1314                 :                :  * any further action.
                               1315                 :                :  *
                               1316                 :                :  * The returned slot name is palloc'ed in current memory context.
                               1317                 :                :  */
                               1318                 :                : static char *
 3089 peter_e@gmx.net          1319                 :            195 : LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
                               1320                 :                : {
                               1321                 :                :     char       *slotname;
                               1322                 :                :     char       *err;
                               1323                 :                :     char        relstate;
                               1324                 :                :     XLogRecPtr  relstate_lsn;
                               1325                 :                :     Relation    rel;
                               1326                 :                :     AclResult   aclresult;
                               1327                 :                :     WalRcvExecResult *res;
                               1328                 :                :     char        originname[NAMEDATALEN];
                               1329                 :                :     RepOriginId originid;
                               1330                 :                :     UserContext ucxt;
                               1331                 :                :     bool        must_use_password;
                               1332                 :                :     bool        run_as_owner;
                               1333                 :                : 
                               1334                 :                :     /* Check the state of the table synchronization. */
                               1335                 :            195 :     StartTransactionCommand();
 3061 fujii@postgresql.org     1336                 :            195 :     relstate = GetSubscriptionRelState(MyLogicalRepWorker->subid,
                               1337                 :            195 :                                        MyLogicalRepWorker->relid,
                               1338                 :                :                                        &relstate_lsn);
  690 akapila@postgresql.o     1339                 :            195 :     CommitTransactionCommand();
                               1340                 :                : 
                               1341                 :                :     /* Is the use of a password mandatory? */
  814                          1342         [ +  + ]:            386 :     must_use_password = MySubscription->passwordrequired &&
  690                          1343         [ -  + ]:            191 :         !MySubscription->ownersuperuser;
                               1344                 :                : 
 3089 peter_e@gmx.net          1345         [ -  + ]:            195 :     SpinLockAcquire(&MyLogicalRepWorker->relmutex);
 3061 fujii@postgresql.org     1346                 :            195 :     MyLogicalRepWorker->relstate = relstate;
                               1347                 :            195 :     MyLogicalRepWorker->relstate_lsn = relstate_lsn;
 3089 peter_e@gmx.net          1348                 :            195 :     SpinLockRelease(&MyLogicalRepWorker->relmutex);
                               1349                 :                : 
                               1350                 :                :     /*
                               1351                 :                :      * If synchronization is already done or no longer necessary, exit now
                               1352                 :                :      * that we've updated shared memory state.
                               1353                 :                :      */
 1787 alvherre@alvh.no-ip.     1354         [ -  + ]:            195 :     switch (relstate)
                               1355                 :                :     {
 1787 alvherre@alvh.no-ip.     1356                 :UBC           0 :         case SUBREL_STATE_SYNCDONE:
                               1357                 :                :         case SUBREL_STATE_READY:
                               1358                 :                :         case SUBREL_STATE_UNKNOWN:
                               1359                 :              0 :             finish_sync_worker();   /* doesn't return */
                               1360                 :                :     }
                               1361                 :                : 
                               1362                 :                :     /* Calculate the name of the tablesync slot. */
 1664 akapila@postgresql.o     1363                 :CBC         195 :     slotname = (char *) palloc(NAMEDATALEN);
                               1364                 :            195 :     ReplicationSlotNameForTablesync(MySubscription->oid,
                               1365                 :            195 :                                     MyLogicalRepWorker->relid,
                               1366                 :                :                                     slotname,
                               1367                 :                :                                     NAMEDATALEN);
                               1368                 :                : 
                               1369                 :                :     /*
                               1370                 :                :      * Here we use the slot name instead of the subscription name as the
                               1371                 :                :      * application_name, so that it is different from the leader apply worker,
                               1372                 :                :      * so that synchronous replication can distinguish them.
                               1373                 :                :      */
 1578 alvherre@alvh.no-ip.     1374                 :            194 :     LogRepWorkerWalRcvConn =
  579 akapila@postgresql.o     1375                 :            195 :         walrcv_connect(MySubscription->conninfo, true, true,
                               1376                 :                :                        must_use_password,
                               1377                 :                :                        slotname, &err);
 1578 alvherre@alvh.no-ip.     1378         [ -  + ]:            194 :     if (LogRepWorkerWalRcvConn == NULL)
 3089 peter_e@gmx.net          1379         [ #  # ]:UBC           0 :         ereport(ERROR,
                               1380                 :                :                 (errcode(ERRCODE_CONNECTION_FAILURE),
                               1381                 :                :                  errmsg("table synchronization worker for subscription \"%s\" could not connect to the publisher: %s",
                               1382                 :                :                         MySubscription->name, err)));
                               1383                 :                : 
 1787 alvherre@alvh.no-ip.     1384   [ +  +  -  +  :CBC         194 :     Assert(MyLogicalRepWorker->relstate == SUBREL_STATE_INIT ||
                                              -  - ]
                               1385                 :                :            MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC ||
                               1386                 :                :            MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY);
                               1387                 :                : 
                               1388                 :                :     /* Assign the origin tracking record name. */
 1061 akapila@postgresql.o     1389                 :            194 :     ReplicationOriginNameForLogicalRep(MySubscription->oid,
                               1390                 :            194 :                                        MyLogicalRepWorker->relid,
                               1391                 :                :                                        originname,
                               1392                 :                :                                        sizeof(originname));
                               1393                 :                : 
 1667                          1394         [ +  + ]:            194 :     if (MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC)
                               1395                 :                :     {
                               1396                 :                :         /*
                               1397                 :                :          * We have previously errored out before finishing the copy so the
                               1398                 :                :          * replication slot might exist. We want to remove the slot if it
                               1399                 :                :          * already exists and proceed.
                               1400                 :                :          *
                               1401                 :                :          * XXX We could also instead try to drop the slot, last time we failed
                               1402                 :                :          * but for that, we might need to clean up the copy state as it might
                               1403                 :                :          * be in the middle of fetching the rows. Also, if there is a network
                               1404                 :                :          * breakdown then it wouldn't have succeeded so trying it next time
                               1405                 :                :          * seems like a better bet.
                               1406                 :                :          */
 1578 alvherre@alvh.no-ip.     1407                 :              7 :         ReplicationSlotDropAtPubNode(LogRepWorkerWalRcvConn, slotname, true);
                               1408                 :                :     }
 1667 akapila@postgresql.o     1409         [ -  + ]:            187 :     else if (MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY)
                               1410                 :                :     {
                               1411                 :                :         /*
                               1412                 :                :          * The COPY phase was previously done, but tablesync then crashed
                               1413                 :                :          * before it was able to finish normally.
                               1414                 :                :          */
 1667 akapila@postgresql.o     1415                 :UBC           0 :         StartTransactionCommand();
                               1416                 :                : 
                               1417                 :                :         /*
                               1418                 :                :          * The origin tracking name must already exist. It was created first
                               1419                 :                :          * time this tablesync was launched.
                               1420                 :                :          */
                               1421                 :              0 :         originid = replorigin_by_name(originname, false);
  971                          1422                 :              0 :         replorigin_session_setup(originid, 0);
 1667                          1423                 :              0 :         replorigin_session_origin = originid;
                               1424                 :              0 :         *origin_startpos = replorigin_session_get_progress(false);
                               1425                 :                : 
                               1426                 :              0 :         CommitTransactionCommand();
                               1427                 :                : 
                               1428                 :              0 :         goto copy_table_done;
                               1429                 :                :     }
                               1430                 :                : 
 1787 alvherre@alvh.no-ip.     1431         [ -  + ]:CBC         194 :     SpinLockAcquire(&MyLogicalRepWorker->relmutex);
                               1432                 :            194 :     MyLogicalRepWorker->relstate = SUBREL_STATE_DATASYNC;
                               1433                 :            194 :     MyLogicalRepWorker->relstate_lsn = InvalidXLogRecPtr;
                               1434                 :            194 :     SpinLockRelease(&MyLogicalRepWorker->relmutex);
                               1435                 :                : 
                               1436                 :                :     /* Update the state and make it visible to others. */
                               1437                 :            194 :     StartTransactionCommand();
                               1438                 :            194 :     UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
                               1439                 :            194 :                                MyLogicalRepWorker->relid,
                               1440                 :            194 :                                MyLogicalRepWorker->relstate,
   36 akapila@postgresql.o     1441                 :            194 :                                MyLogicalRepWorker->relstate_lsn,
                               1442                 :                :                                false);
 1787 alvherre@alvh.no-ip.     1443                 :            194 :     CommitTransactionCommand();
 1249 andres@anarazel.de       1444                 :            194 :     pgstat_report_stat(true);
                               1445                 :                : 
 1787 alvherre@alvh.no-ip.     1446                 :            194 :     StartTransactionCommand();
                               1447                 :                : 
                               1448                 :                :     /*
                               1449                 :                :      * Use a standard write lock here. It might be better to disallow access
                               1450                 :                :      * to the table while it's being synchronized. But we don't want to block
                               1451                 :                :      * the main apply process from working and it has to open the relation in
                               1452                 :                :      * RowExclusiveLock when remapping remote relation id to local one.
                               1453                 :                :      */
                               1454                 :            194 :     rel = table_open(MyLogicalRepWorker->relid, RowExclusiveLock);
                               1455                 :                : 
                               1456                 :                :     /*
                               1457                 :                :      * Start a transaction in the remote node in REPEATABLE READ mode.  This
                               1458                 :                :      * ensures that both the replication slot we create (see below) and the
                               1459                 :                :      * COPY are consistent with each other.
                               1460                 :                :      */
 1578                          1461                 :            194 :     res = walrcv_exec(LogRepWorkerWalRcvConn,
                               1462                 :                :                       "BEGIN READ ONLY ISOLATION LEVEL REPEATABLE READ",
                               1463                 :                :                       0, NULL);
 1787                          1464         [ -  + ]:            194 :     if (res->status != WALRCV_OK_COMMAND)
 1787 alvherre@alvh.no-ip.     1465         [ #  # ]:UBC           0 :         ereport(ERROR,
                               1466                 :                :                 (errcode(ERRCODE_CONNECTION_FAILURE),
                               1467                 :                :                  errmsg("table copy could not start transaction on publisher: %s",
                               1468                 :                :                         res->err)));
 1787 alvherre@alvh.no-ip.     1469                 :CBC         194 :     walrcv_clear_result(res);
                               1470                 :                : 
                               1471                 :                :     /*
                               1472                 :                :      * Create a new permanent logical decoding slot. This slot will be used
                               1473                 :                :      * for the catchup phase after COPY is done, so tell it to use the
                               1474                 :                :      * snapshot to make the final data consistent.
                               1475                 :                :      */
 1515 akapila@postgresql.o     1476                 :            194 :     walrcv_create_slot(LogRepWorkerWalRcvConn,
                               1477                 :                :                        slotname, false /* permanent */ , false /* two_phase */ ,
                               1478                 :                :                        MySubscription->failover,
                               1479                 :                :                        CRS_USE_SNAPSHOT, origin_startpos);
                               1480                 :                : 
                               1481                 :                :     /*
                               1482                 :                :      * Setup replication origin tracking. The purpose of doing this before the
                               1483                 :                :      * copy is to avoid doing the copy again due to any error in setting up
                               1484                 :                :      * origin tracking.
                               1485                 :                :      */
 1667                          1486                 :            194 :     originid = replorigin_by_name(originname, true);
                               1487         [ +  - ]:            194 :     if (!OidIsValid(originid))
                               1488                 :                :     {
                               1489                 :                :         /*
                               1490                 :                :          * Origin tracking does not exist, so create it now.
                               1491                 :                :          *
                               1492                 :                :          * Then advance to the LSN got from walrcv_create_slot. This is WAL
                               1493                 :                :          * logged for the purpose of recovery. Locks are to prevent the
                               1494                 :                :          * replication origin from vanishing while advancing.
                               1495                 :                :          */
                               1496                 :            194 :         originid = replorigin_create(originname);
                               1497                 :                : 
                               1498                 :            194 :         LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
                               1499                 :            194 :         replorigin_advance(originid, *origin_startpos, InvalidXLogRecPtr,
                               1500                 :                :                            true /* go backward */ , true /* WAL log */ );
                               1501                 :            194 :         UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
                               1502                 :                : 
  971                          1503                 :            194 :         replorigin_session_setup(originid, 0);
 1667                          1504                 :            194 :         replorigin_session_origin = originid;
                               1505                 :                :     }
                               1506                 :                :     else
                               1507                 :                :     {
 1667 akapila@postgresql.o     1508         [ #  # ]:UBC           0 :         ereport(ERROR,
                               1509                 :                :                 (errcode(ERRCODE_DUPLICATE_OBJECT),
                               1510                 :                :                  errmsg("replication origin \"%s\" already exists",
                               1511                 :                :                         originname)));
                               1512                 :                :     }
                               1513                 :                : 
                               1514                 :                :     /*
                               1515                 :                :      * Make sure that the copy command runs as the table owner, unless the
                               1516                 :                :      * user has opted out of that behaviour.
                               1517                 :                :      */
  820 msawada@postgresql.o     1518                 :CBC         194 :     run_as_owner = MySubscription->runasowner;
                               1519         [ +  + ]:            194 :     if (!run_as_owner)
                               1520                 :            193 :         SwitchToUntrustedUser(rel->rd_rel->relowner, &ucxt);
                               1521                 :                : 
                               1522                 :                :     /*
                               1523                 :                :      * Check that our table sync worker has permission to insert into the
                               1524                 :                :      * target table.
                               1525                 :                :      */
                               1526                 :            194 :     aclresult = pg_class_aclcheck(RelationGetRelid(rel), GetUserId(),
                               1527                 :                :                                   ACL_INSERT);
                               1528         [ -  + ]:            194 :     if (aclresult != ACLCHECK_OK)
  820 msawada@postgresql.o     1529                 :UBC           0 :         aclcheck_error(aclresult,
                               1530                 :              0 :                        get_relkind_objtype(rel->rd_rel->relkind),
                               1531                 :              0 :                        RelationGetRelationName(rel));
                               1532                 :                : 
                               1533                 :                :     /*
                               1534                 :                :      * COPY FROM does not honor RLS policies.  That is not a problem for
                               1535                 :                :      * subscriptions owned by roles with BYPASSRLS privilege (or superuser,
                               1536                 :                :      * who has it implicitly), but other roles should not be able to
                               1537                 :                :      * circumvent RLS.  Disallow logical replication into RLS enabled
                               1538                 :                :      * relations for such roles.
                               1539                 :                :      */
  820 msawada@postgresql.o     1540         [ -  + ]:CBC         194 :     if (check_enable_rls(RelationGetRelid(rel), InvalidOid, false) == RLS_ENABLED)
  820 msawada@postgresql.o     1541         [ #  # ]:UBC           0 :         ereport(ERROR,
                               1542                 :                :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                               1543                 :                :                  errmsg("user \"%s\" cannot replicate into relation with row-level security enabled: \"%s\"",
                               1544                 :                :                         GetUserNameFromId(GetUserId(), true),
                               1545                 :                :                         RelationGetRelationName(rel))));
                               1546                 :                : 
                               1547                 :                :     /* Now do the initial data copy */
 1248 tomas.vondra@postgre     1548                 :CBC         194 :     PushActiveSnapshot(GetTransactionSnapshot());
                               1549                 :            194 :     copy_table(rel);
                               1550                 :            183 :     PopActiveSnapshot();
                               1551                 :                : 
 1578 alvherre@alvh.no-ip.     1552                 :            183 :     res = walrcv_exec(LogRepWorkerWalRcvConn, "COMMIT", 0, NULL);
 1787                          1553         [ -  + ]:            183 :     if (res->status != WALRCV_OK_COMMAND)
 1787 alvherre@alvh.no-ip.     1554         [ #  # ]:UBC           0 :         ereport(ERROR,
                               1555                 :                :                 (errcode(ERRCODE_CONNECTION_FAILURE),
                               1556                 :                :                  errmsg("table copy could not finish transaction on publisher: %s",
                               1557                 :                :                         res->err)));
 1787 alvherre@alvh.no-ip.     1558                 :CBC         183 :     walrcv_clear_result(res);
                               1559                 :                : 
  809 tgl@sss.pgh.pa.us        1560         [ +  + ]:            183 :     if (!run_as_owner)
  820 msawada@postgresql.o     1561                 :            182 :         RestoreUserContext(&ucxt);
                               1562                 :                : 
 1787 alvherre@alvh.no-ip.     1563                 :            183 :     table_close(rel, NoLock);
                               1564                 :                : 
                               1565                 :                :     /* Make the copy visible. */
                               1566                 :            183 :     CommandCounterIncrement();
                               1567                 :                : 
                               1568                 :                :     /*
                               1569                 :                :      * Update the persisted state to indicate the COPY phase is done; make it
                               1570                 :                :      * visible to others.
                               1571                 :                :      */
 1667 akapila@postgresql.o     1572                 :            183 :     UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
                               1573                 :            183 :                                MyLogicalRepWorker->relid,
                               1574                 :                :                                SUBREL_STATE_FINISHEDCOPY,
   36                          1575                 :            183 :                                MyLogicalRepWorker->relstate_lsn,
                               1576                 :                :                                false);
                               1577                 :                : 
 1667                          1578                 :            183 :     CommitTransactionCommand();
                               1579                 :                : 
                               1580                 :            183 : copy_table_done:
                               1581                 :                : 
                               1582         [ -  + ]:            183 :     elog(DEBUG1,
                               1583                 :                :          "LogicalRepSyncTableStart: '%s' origin_startpos lsn %X/%08X",
                               1584                 :                :          originname, LSN_FORMAT_ARGS(*origin_startpos));
                               1585                 :                : 
                               1586                 :                :     /*
                               1587                 :                :      * We are done with the initial data synchronization, update the state.
                               1588                 :                :      */
 1787 alvherre@alvh.no-ip.     1589         [ -  + ]:            183 :     SpinLockAcquire(&MyLogicalRepWorker->relmutex);
                               1590                 :            183 :     MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCWAIT;
                               1591                 :            183 :     MyLogicalRepWorker->relstate_lsn = *origin_startpos;
                               1592                 :            183 :     SpinLockRelease(&MyLogicalRepWorker->relmutex);
                               1593                 :                : 
                               1594                 :                :     /*
                               1595                 :                :      * Finally, wait until the leader apply worker tells us to catch up and
                               1596                 :                :      * then return to let LogicalRepApplyLoop do it.
                               1597                 :                :      */
                               1598                 :            183 :     wait_for_worker_state_change(SUBREL_STATE_CATCHUP);
 3089 peter_e@gmx.net          1599                 :            183 :     return slotname;
                               1600                 :                : }
                               1601                 :                : 
                               1602                 :                : /*
                               1603                 :                :  * Common code to fetch the up-to-date sync state info into the static lists.
                               1604                 :                :  *
                               1605                 :                :  * Returns true if subscription has 1 or more tables, else false.
                               1606                 :                :  *
                               1607                 :                :  * Note: If this function started the transaction (indicated by the parameter)
                               1608                 :                :  * then it is the caller's responsibility to commit it.
                               1609                 :                :  */
                               1610                 :                : static bool
 1515 akapila@postgresql.o     1611                 :           3684 : FetchTableStates(bool *started_tx)
                               1612                 :                : {
                               1613                 :                :     static bool has_subrels = false;
                               1614                 :                : 
                               1615                 :           3684 :     *started_tx = false;
                               1616                 :                : 
  499                          1617         [ +  + ]:           3684 :     if (table_states_validity != SYNC_TABLE_STATE_VALID)
                               1618                 :                :     {
                               1619                 :                :         MemoryContext oldctx;
                               1620                 :                :         List       *rstates;
                               1621                 :                :         ListCell   *lc;
                               1622                 :                :         SubscriptionRelState *rstate;
                               1623                 :                : 
                               1624                 :            887 :         table_states_validity = SYNC_TABLE_STATE_REBUILD_STARTED;
                               1625                 :                : 
                               1626                 :                :         /* Clean the old lists. */
 1515                          1627                 :            887 :         list_free_deep(table_states_not_ready);
                               1628                 :            887 :         table_states_not_ready = NIL;
                               1629                 :                : 
                               1630         [ +  + ]:            887 :         if (!IsTransactionState())
                               1631                 :                :         {
                               1632                 :            871 :             StartTransactionCommand();
                               1633                 :            871 :             *started_tx = true;
                               1634                 :                :         }
                               1635                 :                : 
                               1636                 :                :         /* Fetch all non-ready tables. */
 1137 michael@paquier.xyz      1637                 :            887 :         rstates = GetSubscriptionRelations(MySubscription->oid, true);
                               1638                 :                : 
                               1639                 :                :         /* Allocate the tracking info in a permanent memory context. */
 1515 akapila@postgresql.o     1640                 :            887 :         oldctx = MemoryContextSwitchTo(CacheMemoryContext);
                               1641   [ +  +  +  +  :           2428 :         foreach(lc, rstates)
                                              +  + ]
                               1642                 :                :         {
                               1643                 :           1541 :             rstate = palloc(sizeof(SubscriptionRelState));
                               1644                 :           1541 :             memcpy(rstate, lfirst(lc), sizeof(SubscriptionRelState));
                               1645                 :           1541 :             table_states_not_ready = lappend(table_states_not_ready, rstate);
                               1646                 :                :         }
                               1647                 :            887 :         MemoryContextSwitchTo(oldctx);
                               1648                 :                : 
                               1649                 :                :         /*
                               1650                 :                :          * Does the subscription have tables?
                               1651                 :                :          *
                               1652                 :                :          * If there were not-READY relations found then we know it does. But
                               1653                 :                :          * if table_states_not_ready was empty we still need to check again to
                               1654                 :                :          * see if there are 0 tables.
                               1655                 :                :          */
 1116 tgl@sss.pgh.pa.us        1656   [ +  +  +  + ]:           1128 :         has_subrels = (table_states_not_ready != NIL) ||
 1515 akapila@postgresql.o     1657                 :            241 :             HasSubscriptionRelations(MySubscription->oid);
                               1658                 :                : 
                               1659                 :                :         /*
                               1660                 :                :          * If the subscription relation cache has been invalidated since we
                               1661                 :                :          * entered this routine, we still use and return the relations we just
                               1662                 :                :          * finished constructing, to avoid infinite loops, but we leave the
                               1663                 :                :          * table states marked as stale so that we'll rebuild it again on next
                               1664                 :                :          * access. Otherwise, we mark the table states as valid.
                               1665                 :                :          */
  499                          1666         [ +  - ]:            887 :         if (table_states_validity == SYNC_TABLE_STATE_REBUILD_STARTED)
                               1667                 :            887 :             table_states_validity = SYNC_TABLE_STATE_VALID;
                               1668                 :                :     }
                               1669                 :                : 
 1515                          1670                 :           3684 :     return has_subrels;
                               1671                 :                : }
                               1672                 :                : 
                               1673                 :                : /*
                               1674                 :                :  * Execute the initial sync with error handling. Disable the subscription,
                               1675                 :                :  * if it's required.
                               1676                 :                :  *
                               1677                 :                :  * Allocate the slot name in long-lived context on return. Note that we don't
                               1678                 :                :  * handle FATAL errors which are probably because of system resource error and
                               1679                 :                :  * are not repeatable.
                               1680                 :                :  */
                               1681                 :                : static void
  765                          1682                 :            195 : start_table_sync(XLogRecPtr *origin_startpos, char **slotname)
                               1683                 :                : {
                               1684                 :            195 :     char       *sync_slotname = NULL;
                               1685                 :                : 
                               1686         [ -  + ]:            195 :     Assert(am_tablesync_worker());
                               1687                 :                : 
                               1688         [ +  + ]:            195 :     PG_TRY();
                               1689                 :                :     {
                               1690                 :                :         /* Call initial sync. */
                               1691                 :            195 :         sync_slotname = LogicalRepSyncTableStart(origin_startpos);
                               1692                 :                :     }
                               1693                 :             11 :     PG_CATCH();
                               1694                 :                :     {
                               1695         [ +  + ]:             11 :         if (MySubscription->disableonerr)
                               1696                 :              1 :             DisableSubscriptionAndExit();
                               1697                 :                :         else
                               1698                 :                :         {
                               1699                 :                :             /*
                               1700                 :                :              * Report the worker failed during table synchronization. Abort
                               1701                 :                :              * the current transaction so that the stats message is sent in an
                               1702                 :                :              * idle state.
                               1703                 :                :              */
                               1704                 :             10 :             AbortOutOfAnyTransaction();
                               1705                 :             10 :             pgstat_report_subscription_error(MySubscription->oid, false);
                               1706                 :                : 
                               1707                 :             10 :             PG_RE_THROW();
                               1708                 :                :         }
                               1709                 :                :     }
                               1710         [ -  + ]:            183 :     PG_END_TRY();
                               1711                 :                : 
                               1712                 :                :     /* allocate slot name in long-lived context */
                               1713                 :            183 :     *slotname = MemoryContextStrdup(ApplyContext, sync_slotname);
                               1714                 :            183 :     pfree(sync_slotname);
                               1715                 :            183 : }
                               1716                 :                : 
                               1717                 :                : /*
                               1718                 :                :  * Runs the tablesync worker.
                               1719                 :                :  *
                               1720                 :                :  * It starts syncing tables. After a successful sync, sets streaming options
                               1721                 :                :  * and starts streaming to catchup with apply worker.
                               1722                 :                :  */
                               1723                 :                : static void
                               1724                 :            195 : run_tablesync_worker()
                               1725                 :                : {
                               1726                 :                :     char        originname[NAMEDATALEN];
                               1727                 :            195 :     XLogRecPtr  origin_startpos = InvalidXLogRecPtr;
                               1728                 :            195 :     char       *slotname = NULL;
                               1729                 :                :     WalRcvStreamOptions options;
                               1730                 :                : 
                               1731                 :            195 :     start_table_sync(&origin_startpos, &slotname);
                               1732                 :                : 
                               1733                 :            183 :     ReplicationOriginNameForLogicalRep(MySubscription->oid,
                               1734                 :            183 :                                        MyLogicalRepWorker->relid,
                               1735                 :                :                                        originname,
                               1736                 :                :                                        sizeof(originname));
                               1737                 :                : 
                               1738                 :            183 :     set_apply_error_context_origin(originname);
                               1739                 :                : 
                               1740                 :            183 :     set_stream_options(&options, slotname, &origin_startpos);
                               1741                 :                : 
                               1742                 :            183 :     walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
                               1743                 :                : 
                               1744                 :                :     /* Apply the changes till we catchup with the apply worker. */
                               1745                 :            183 :     start_apply(origin_startpos);
  765 akapila@postgresql.o     1746                 :UBC           0 : }
                               1747                 :                : 
                               1748                 :                : /* Logical Replication Tablesync worker entry point */
                               1749                 :                : void
  765 akapila@postgresql.o     1750                 :CBC         198 : TablesyncWorkerMain(Datum main_arg)
                               1751                 :                : {
                               1752                 :            198 :     int         worker_slot = DatumGetInt32(main_arg);
                               1753                 :                : 
                               1754                 :            198 :     SetupApplyOrSyncWorker(worker_slot);
                               1755                 :                : 
                               1756                 :            195 :     run_tablesync_worker();
                               1757                 :                : 
  765 akapila@postgresql.o     1758                 :UBC           0 :     finish_sync_worker();
                               1759                 :                : }
                               1760                 :                : 
                               1761                 :                : /*
                               1762                 :                :  * If the subscription has no tables then return false.
                               1763                 :                :  *
                               1764                 :                :  * Otherwise, are all tablesyncs READY?
                               1765                 :                :  *
                               1766                 :                :  * Note: This function is not suitable to be called from outside of apply or
                               1767                 :                :  * tablesync workers because MySubscription needs to be already initialized.
                               1768                 :                :  */
                               1769                 :                : bool
 1515 akapila@postgresql.o     1770                 :CBC         125 : AllTablesyncsReady(void)
                               1771                 :                : {
                               1772                 :            125 :     bool        started_tx = false;
                               1773                 :            125 :     bool        has_subrels = false;
                               1774                 :                : 
                               1775                 :                :     /* We need up-to-date sync state info for subscription tables here. */
                               1776                 :            125 :     has_subrels = FetchTableStates(&started_tx);
                               1777                 :                : 
                               1778         [ +  + ]:            125 :     if (started_tx)
                               1779                 :                :     {
                               1780                 :             18 :         CommitTransactionCommand();
 1249 andres@anarazel.de       1781                 :             18 :         pgstat_report_stat(true);
                               1782                 :                :     }
                               1783                 :                : 
                               1784                 :                :     /*
                               1785                 :                :      * Return false when there are no tables in subscription or not all tables
                               1786                 :                :      * are in ready state; true otherwise.
                               1787                 :                :      */
 1116 tgl@sss.pgh.pa.us        1788   [ +  -  +  + ]:            125 :     return has_subrels && (table_states_not_ready == NIL);
                               1789                 :                : }
                               1790                 :                : 
                               1791                 :                : /*
                               1792                 :                :  * Update the two_phase state of the specified subscription in pg_subscription.
                               1793                 :                :  */
                               1794                 :                : void
 1515 akapila@postgresql.o     1795                 :             10 : UpdateTwoPhaseState(Oid suboid, char new_state)
                               1796                 :                : {
                               1797                 :                :     Relation    rel;
                               1798                 :                :     HeapTuple   tup;
                               1799                 :                :     bool        nulls[Natts_pg_subscription];
                               1800                 :                :     bool        replaces[Natts_pg_subscription];
                               1801                 :                :     Datum       values[Natts_pg_subscription];
                               1802                 :                : 
                               1803   [ +  -  +  -  :             10 :     Assert(new_state == LOGICALREP_TWOPHASE_STATE_DISABLED ||
                                              -  + ]
                               1804                 :                :            new_state == LOGICALREP_TWOPHASE_STATE_PENDING ||
                               1805                 :                :            new_state == LOGICALREP_TWOPHASE_STATE_ENABLED);
                               1806                 :                : 
                               1807                 :             10 :     rel = table_open(SubscriptionRelationId, RowExclusiveLock);
                               1808                 :             10 :     tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(suboid));
                               1809         [ -  + ]:             10 :     if (!HeapTupleIsValid(tup))
 1515 akapila@postgresql.o     1810         [ #  # ]:UBC           0 :         elog(ERROR,
                               1811                 :                :              "cache lookup failed for subscription oid %u",
                               1812                 :                :              suboid);
                               1813                 :                : 
                               1814                 :                :     /* Form a new tuple. */
 1515 akapila@postgresql.o     1815                 :CBC          10 :     memset(values, 0, sizeof(values));
                               1816                 :             10 :     memset(nulls, false, sizeof(nulls));
                               1817                 :             10 :     memset(replaces, false, sizeof(replaces));
                               1818                 :                : 
                               1819                 :                :     /* And update/set two_phase state */
                               1820                 :             10 :     values[Anum_pg_subscription_subtwophasestate - 1] = CharGetDatum(new_state);
                               1821                 :             10 :     replaces[Anum_pg_subscription_subtwophasestate - 1] = true;
                               1822                 :                : 
                               1823                 :             10 :     tup = heap_modify_tuple(tup, RelationGetDescr(rel),
                               1824                 :                :                             values, nulls, replaces);
                               1825                 :             10 :     CatalogTupleUpdate(rel, &tup->t_self, tup);
                               1826                 :                : 
                               1827                 :             10 :     heap_freetuple(tup);
                               1828                 :             10 :     table_close(rel, RowExclusiveLock);
                               1829                 :             10 : }
        

Generated by: LCOV version 2.4-beta