LCOV - differential code coverage report
Current view: top level - src/backend/replication/logical - tablesync.c (source / functions) Coverage Total Hit UNC LBC UBC GBC GNC CBC DUB DCB
Current: b45a8d7d8b306b43f31a002f1b3f1dddc8defeaf vs 8767b449a3a1e75626dfb08f24da54933171d4c5 Lines: 91.5 % 507 464 2 2 39 1 13 450 4 52
Current Date: 2025-10-28 08:26:42 +0900 Functions: 100.0 % 16 16 7 9 7
Baseline: lcov-20251028-005825-baseline Branches: 65.0 % 300 195 1 104 2 2 191
Baseline Date: 2025-10-27 06:37:35 +0000 Line coverage date bins:
Legend: Lines:     hit not hit
Branches: + taken - not taken # not executed
(7,30] days: 81.8 % 11 9 2 9
(30,360] days: 100.0 % 24 24 4 20
(360..) days: 91.3 % 472 431 2 39 1 430
Function coverage date bins:
(7,30] days: 100.0 % 4 4 4
(360..) days: 100.0 % 12 12 3 9
Branch coverage date bins:
(30,360] days: 87.5 % 8 7 1 2 5
(360..) days: 64.4 % 292 188 1 103 2 186

 Age         Owner                    Branch data    TLA  Line data    Source code
                                  1                 :                : /*-------------------------------------------------------------------------
                                  2                 :                :  * tablesync.c
                                  3                 :                :  *    PostgreSQL logical replication: initial table data synchronization
                                  4                 :                :  *
                                  5                 :                :  * Copyright (c) 2012-2025, PostgreSQL Global Development Group
                                  6                 :                :  *
                                  7                 :                :  * IDENTIFICATION
                                  8                 :                :  *    src/backend/replication/logical/tablesync.c
                                  9                 :                :  *
                                 10                 :                :  * NOTES
                                 11                 :                :  *    This file contains code for initial table data synchronization for
                                 12                 :                :  *    logical replication.
                                 13                 :                :  *
                                 14                 :                :  *    The initial data synchronization is done separately for each table,
                                 15                 :                :  *    in a separate apply worker that only fetches the initial snapshot data
                                 16                 :                :  *    from the publisher and then synchronizes the position in the stream with
                                 17                 :                :  *    the leader apply worker.
                                 18                 :                :  *
                                 19                 :                :  *    There are several reasons for doing the synchronization this way:
                                 20                 :                :  *     - It allows us to parallelize the initial data synchronization
                                 21                 :                :  *       which lowers the time needed for it to happen.
                                 22                 :                :  *     - The initial synchronization does not have to hold the xid and LSN
                                 23                 :                :  *       for the time it takes to copy data of all tables, causing less
                                 24                 :                :  *       bloat and lower disk consumption compared to doing the
                                 25                 :                :  *       synchronization in a single process for the whole database.
                                 26                 :                :  *     - It allows us to synchronize any tables added after the initial
                                 27                 :                :  *       synchronization has finished.
                                 28                 :                :  *
                                 29                 :                :  *    The stream position synchronization works in multiple steps:
                                 30                 :                :  *     - Apply worker requests a tablesync worker to start, setting the new
                                 31                 :                :  *       table state to INIT.
                                 32                 :                :  *     - Tablesync worker starts; changes table state from INIT to DATASYNC while
                                 33                 :                :  *       copying.
                                 34                 :                :  *     - Tablesync worker does initial table copy; there is a FINISHEDCOPY (sync
                                 35                 :                :  *       worker specific) state to indicate when the copy phase has completed, so
                                 36                 :                :  *       if the worker crashes with this (non-memory) state then the copy will not
                                 37                 :                :  *       be re-attempted.
                                 38                 :                :  *     - Tablesync worker then sets table state to SYNCWAIT; waits for state change.
                                 39                 :                :  *     - Apply worker periodically checks for tables in SYNCWAIT state.  When
                                 40                 :                :  *       any appear, it sets the table state to CATCHUP and starts loop-waiting
                                 41                 :                :  *       until either the table state is set to SYNCDONE or the sync worker
                                 42                 :                :  *       exits.
                                 43                 :                :  *     - After the sync worker has seen the state change to CATCHUP, it will
                                 44                 :                :  *       read the stream and apply changes (acting like an apply worker) until
                                 45                 :                :  *       it catches up to the specified stream position.  Then it sets the
                                 46                 :                :  *       state to SYNCDONE.  There might be zero changes applied between
                                 47                 :                :  *       CATCHUP and SYNCDONE, because the sync worker might be ahead of the
                                 48                 :                :  *       apply worker.
                                 49                 :                :  *     - Once the state is set to SYNCDONE, the apply will continue tracking
                                 50                 :                :  *       the table until it reaches the SYNCDONE stream position, at which
                                 51                 :                :  *       point it sets state to READY and stops tracking.  Again, there might
                                 52                 :                :  *       be zero changes in between.
                                 53                 :                :  *
                                 54                 :                :  *    So the state progression is always: INIT -> DATASYNC -> FINISHEDCOPY
                                 55                 :                :  *    -> SYNCWAIT -> CATCHUP -> SYNCDONE -> READY.
                                 56                 :                :  *
                                 57                 :                :  *    The catalog pg_subscription_rel is used to keep information about
                                 58                 :                :  *    subscribed tables and their state.  The catalog holds all states
                                 59                 :                :  *    except SYNCWAIT and CATCHUP which are only in shared memory.
                                 60                 :                :  *
                                 61                 :                :  *    Example flows look like this:
                                 62                 :                :  *     - Apply is in front:
                                 63                 :                :  *        sync:8
                                 64                 :                :  *          -> set in catalog FINISHEDCOPY
                                 65                 :                :  *          -> set in memory SYNCWAIT
                                 66                 :                :  *        apply:10
                                 67                 :                :  *          -> set in memory CATCHUP
                                 68                 :                :  *          -> enter wait-loop
                                 69                 :                :  *        sync:10
                                 70                 :                :  *          -> set in catalog SYNCDONE
                                 71                 :                :  *          -> exit
                                 72                 :                :  *        apply:10
                                 73                 :                :  *          -> exit wait-loop
                                 74                 :                :  *          -> continue rep
                                 75                 :                :  *        apply:11
                                 76                 :                :  *          -> set in catalog READY
                                 77                 :                :  *
                                 78                 :                :  *     - Sync is in front:
                                 79                 :                :  *        sync:10
                                 80                 :                :  *          -> set in catalog FINISHEDCOPY
                                 81                 :                :  *          -> set in memory SYNCWAIT
                                 82                 :                :  *        apply:8
                                 83                 :                :  *          -> set in memory CATCHUP
                                 84                 :                :  *          -> continue per-table filtering
                                 85                 :                :  *        sync:10
                                 86                 :                :  *          -> set in catalog SYNCDONE
                                 87                 :                :  *          -> exit
                                 88                 :                :  *        apply:10
                                 89                 :                :  *          -> set in catalog READY
                                 90                 :                :  *          -> stop per-table filtering
                                 91                 :                :  *          -> continue rep
                                 92                 :                :  *-------------------------------------------------------------------------
                                 93                 :                :  */
                                 94                 :                : 
                                 95                 :                : #include "postgres.h"
                                 96                 :                : 
                                 97                 :                : #include "access/table.h"
                                 98                 :                : #include "access/xact.h"
                                 99                 :                : #include "catalog/indexing.h"
                                100                 :                : #include "catalog/pg_subscription_rel.h"
                                101                 :                : #include "catalog/pg_type.h"
                                102                 :                : #include "commands/copy.h"
                                103                 :                : #include "miscadmin.h"
                                104                 :                : #include "nodes/makefuncs.h"
                                105                 :                : #include "parser/parse_relation.h"
                                106                 :                : #include "pgstat.h"
                                107                 :                : #include "replication/logicallauncher.h"
                                108                 :                : #include "replication/logicalrelation.h"
                                109                 :                : #include "replication/logicalworker.h"
                                110                 :                : #include "replication/origin.h"
                                111                 :                : #include "replication/slot.h"
                                112                 :                : #include "replication/walreceiver.h"
                                113                 :                : #include "replication/worker_internal.h"
                                114                 :                : #include "storage/ipc.h"
                                115                 :                : #include "storage/lmgr.h"
                                116                 :                : #include "utils/acl.h"
                                117                 :                : #include "utils/array.h"
                                118                 :                : #include "utils/builtins.h"
                                119                 :                : #include "utils/lsyscache.h"
                                120                 :                : #include "utils/rls.h"
                                121                 :                : #include "utils/snapmgr.h"
                                122                 :                : #include "utils/syscache.h"
                                123                 :                : #include "utils/usercontext.h"
                                124                 :                : 
                                125                 :                : List       *table_states_not_ready = NIL;
                                126                 :                : 
                                127                 :                : static StringInfo copybuf = NULL;
                                128                 :                : 
                                129                 :                : /*
                                130                 :                :  * Wait until the relation sync state is set in the catalog to the expected
                                131                 :                :  * one; return true when it happens.
                                132                 :                :  *
                                133                 :                :  * Returns false if the table sync worker or the table itself have
                                134                 :                :  * disappeared, or the table state has been reset.
                                135                 :                :  *
                                136                 :                :  * Currently, this is used in the apply worker when transitioning from
                                137                 :                :  * CATCHUP state to SYNCDONE.
                                138                 :                :  */
                                139                 :                : static bool
   12 akapila@postgresql.o      140                 :GNC         182 : wait_for_table_state_change(Oid relid, char expected_state)
                                141                 :                : {
                                142                 :                :     char        state;
                                143                 :                : 
                                144                 :                :     for (;;)
 3141 peter_e@gmx.net           145                 :CBC         204 :     {
                                146                 :                :         LogicalRepWorker *worker;
                                147                 :                :         XLogRecPtr  statelsn;
                                148                 :                : 
 3070                           149         [ -  + ]:            386 :         CHECK_FOR_INTERRUPTS();
                                150                 :                : 
 1839 alvherre@alvh.no-ip.      151                 :            386 :         InvalidateCatalogSnapshot();
 3066 peter_e@gmx.net           152                 :            386 :         state = GetSubscriptionRelState(MyLogicalRepWorker->subid,
                                153                 :                :                                         relid, &statelsn);
                                154                 :                : 
                                155         [ -  + ]:            386 :         if (state == SUBREL_STATE_UNKNOWN)
 1839 alvherre@alvh.no-ip.      156                 :UBC           0 :             break;
                                157                 :                : 
 3066 peter_e@gmx.net           158         [ +  + ]:CBC         386 :         if (state == expected_state)
                                159                 :            182 :             return true;
                                160                 :                : 
                                161                 :                :         /* Check if the sync worker is still running and bail if not. */
 3141                           162                 :            204 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
 1839 alvherre@alvh.no-ip.      163                 :            204 :         worker = logicalrep_worker_find(MyLogicalRepWorker->subid, relid,
                                164                 :                :                                         false);
 3066 peter_e@gmx.net           165                 :            204 :         LWLockRelease(LogicalRepWorkerLock);
 3141                           166         [ -  + ]:            204 :         if (!worker)
 1839 alvherre@alvh.no-ip.      167                 :LBC         (1) :             break;
                                168                 :                : 
 2531 tmunro@postgresql.or      169                 :CBC         204 :         (void) WaitLatch(MyLatch,
                                170                 :                :                          WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
                                171                 :                :                          1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
                                172                 :                : 
 3066 andres@anarazel.de        173                 :            204 :         ResetLatch(MyLatch);
                                174                 :                :     }
                                175                 :                : 
 3066 peter_e@gmx.net           176                 :LBC         (1) :     return false;
                                177                 :                : }
                                178                 :                : 
                                179                 :                : /*
                                180                 :                :  * Wait until the apply worker changes the state of our synchronization
                                181                 :                :  * worker to the expected one.
                                182                 :                :  *
                                183                 :                :  * Used when transitioning from SYNCWAIT state to CATCHUP.
                                184                 :                :  *
                                185                 :                :  * Returns false if the apply worker has disappeared.
                                186                 :                :  */
                                187                 :                : static bool
 3066 peter_e@gmx.net           188                 :CBC         184 : wait_for_worker_state_change(char expected_state)
                                189                 :                : {
                                190                 :                :     int         rc;
                                191                 :                : 
                                192                 :                :     for (;;)
                                193                 :            184 :     {
                                194                 :                :         LogicalRepWorker *worker;
                                195                 :                : 
                                196         [ -  + ]:            368 :         CHECK_FOR_INTERRUPTS();
                                197                 :                : 
                                198                 :                :         /*
                                199                 :                :          * Done if already in correct state.  (We assume this fetch is atomic
                                200                 :                :          * enough to not give a misleading answer if we do it with no lock.)
                                201                 :                :          */
 3042 tgl@sss.pgh.pa.us         202         [ +  + ]:            368 :         if (MyLogicalRepWorker->relstate == expected_state)
                                203                 :            184 :             return true;
                                204                 :                : 
                                205                 :                :         /*
                                206                 :                :          * Bail out if the apply worker has died, else signal it we're
                                207                 :                :          * waiting.
                                208                 :                :          */
 3066 peter_e@gmx.net           209                 :            184 :         LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
                                210                 :            184 :         worker = logicalrep_worker_find(MyLogicalRepWorker->subid,
                                211                 :                :                                         InvalidOid, false);
 3042 tgl@sss.pgh.pa.us         212   [ +  -  +  - ]:            184 :         if (worker && worker->proc)
                                213                 :            184 :             logicalrep_worker_wakeup_ptr(worker);
 3066 peter_e@gmx.net           214                 :            184 :         LWLockRelease(LogicalRepWorkerLock);
                                215         [ -  + ]:            184 :         if (!worker)
 3042 tgl@sss.pgh.pa.us         216                 :UBC           0 :             break;
                                217                 :                : 
                                218                 :                :         /*
                                219                 :                :          * Wait.  We expect to get a latch signal back from the apply worker,
                                220                 :                :          * but use a timeout in case it dies without sending one.
                                221                 :                :          */
 3066 andres@anarazel.de        222                 :CBC         184 :         rc = WaitLatch(MyLatch,
                                223                 :                :                        WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
                                224                 :                :                        1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
                                225                 :                : 
 3042 tgl@sss.pgh.pa.us         226         [ +  - ]:            184 :         if (rc & WL_LATCH_SET)
                                227                 :            184 :             ResetLatch(MyLatch);
                                228                 :                :     }
                                229                 :                : 
 3141 peter_e@gmx.net           230                 :UBC           0 :     return false;
                                231                 :                : }
                                232                 :                : 
                                233                 :                : /*
                                234                 :                :  * Handle table synchronization cooperation from the synchronization
                                235                 :                :  * worker.
                                236                 :                :  *
                                237                 :                :  * If the sync worker is in CATCHUP state and reached (or passed) the
                                238                 :                :  * predetermined synchronization point in the WAL stream, mark the table as
                                239                 :                :  * SYNCDONE and finish.
                                240                 :                :  */
                                241                 :                : void
   12 akapila@postgresql.o      242                 :GNC         197 : ProcessSyncingTablesForSync(XLogRecPtr current_lsn)
                                243                 :                : {
 3141 peter_e@gmx.net           244         [ -  + ]:CBC         197 :     SpinLockAcquire(&MyLogicalRepWorker->relmutex);
                                245                 :                : 
                                246         [ +  - ]:            197 :     if (MyLogicalRepWorker->relstate == SUBREL_STATE_CATCHUP &&
                                247         [ +  + ]:            197 :         current_lsn >= MyLogicalRepWorker->relstate_lsn)
                                248                 :                :     {
                                249                 :                :         TimeLineID  tli;
 1719 akapila@postgresql.o      250                 :            184 :         char        syncslotname[NAMEDATALEN] = {0};
 1155                           251                 :            184 :         char        originname[NAMEDATALEN] = {0};
                                252                 :                : 
 3066 peter_e@gmx.net           253                 :            184 :         MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCDONE;
 3141                           254                 :            184 :         MyLogicalRepWorker->relstate_lsn = current_lsn;
                                255                 :                : 
                                256                 :            184 :         SpinLockRelease(&MyLogicalRepWorker->relmutex);
                                257                 :                : 
                                258                 :                :         /*
                                259                 :                :          * UpdateSubscriptionRelState must be called within a transaction.
                                260                 :                :          */
 1719 akapila@postgresql.o      261         [ +  - ]:            184 :         if (!IsTransactionState())
                                262                 :            184 :             StartTransactionCommand();
                                263                 :                : 
 2762 peter_e@gmx.net           264                 :            184 :         UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
                                265                 :            184 :                                    MyLogicalRepWorker->relid,
                                266                 :            184 :                                    MyLogicalRepWorker->relstate,
   88 akapila@postgresql.o      267                 :            184 :                                    MyLogicalRepWorker->relstate_lsn,
                                268                 :                :                                    false);
                                269                 :                : 
                                270                 :                :         /*
                                271                 :                :          * End streaming so that LogRepWorkerWalRcvConn can be used to drop
                                272                 :                :          * the slot.
                                273                 :                :          */
 1630 alvherre@alvh.no-ip.      274                 :            184 :         walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli);
                                275                 :                : 
                                276                 :                :         /*
                                277                 :                :          * Cleanup the tablesync slot.
                                278                 :                :          *
                                279                 :                :          * This has to be done after updating the state because otherwise if
                                280                 :                :          * there is an error while doing the database operations we won't be
                                281                 :                :          * able to rollback dropped slot.
                                282                 :                :          */
 1719 akapila@postgresql.o      283                 :            184 :         ReplicationSlotNameForTablesync(MyLogicalRepWorker->subid,
                                284                 :            184 :                                         MyLogicalRepWorker->relid,
                                285                 :                :                                         syncslotname,
                                286                 :                :                                         sizeof(syncslotname));
                                287                 :                : 
                                288                 :                :         /*
                                289                 :                :          * It is important to give an error if we are unable to drop the slot,
                                290                 :                :          * otherwise, it won't be dropped till the corresponding subscription
                                291                 :                :          * is dropped. So passing missing_ok = false.
                                292                 :                :          */
 1630 alvherre@alvh.no-ip.      293                 :            184 :         ReplicationSlotDropAtPubNode(LogRepWorkerWalRcvConn, syncslotname, false);
                                294                 :                : 
 1142 akapila@postgresql.o      295                 :            184 :         CommitTransactionCommand();
                                296                 :            184 :         pgstat_report_stat(false);
                                297                 :                : 
                                298                 :                :         /*
                                299                 :                :          * Start a new transaction to clean up the tablesync origin tracking.
                                300                 :                :          * This transaction will be ended within the FinishSyncWorker(). Now,
                                301                 :                :          * even, if we fail to remove this here, the apply worker will ensure
                                302                 :                :          * to clean it up afterward.
                                303                 :                :          *
                                304                 :                :          * We need to do this after the table state is set to SYNCDONE.
                                305                 :                :          * Otherwise, if an error occurs while performing the database
                                306                 :                :          * operation, the worker will be restarted and the in-memory state of
                                307                 :                :          * replication progress (remote_lsn) won't be rolled-back which would
                                308                 :                :          * have been cleared before restart. So, the restarted worker will use
                                309                 :                :          * invalid replication progress state resulting in replay of
                                310                 :                :          * transactions that have already been applied.
                                311                 :                :          */
                                312                 :            184 :         StartTransactionCommand();
                                313                 :                : 
 1113                           314                 :            184 :         ReplicationOriginNameForLogicalRep(MyLogicalRepWorker->subid,
                                315                 :            184 :                                            MyLogicalRepWorker->relid,
                                316                 :                :                                            originname,
                                317                 :                :                                            sizeof(originname));
                                318                 :                : 
                                319                 :                :         /*
                                320                 :                :          * Resetting the origin session removes the ownership of the slot.
                                321                 :                :          * This is needed to allow the origin to be dropped.
                                322                 :                :          */
 1142                           323                 :            184 :         replorigin_session_reset();
                                324                 :            184 :         replorigin_session_origin = InvalidRepOriginId;
                                325                 :            184 :         replorigin_session_origin_lsn = InvalidXLogRecPtr;
                                326                 :            184 :         replorigin_session_origin_timestamp = 0;
                                327                 :                : 
                                328                 :                :         /*
                                329                 :                :          * Drop the tablesync's origin tracking if exists.
                                330                 :                :          *
                                331                 :                :          * There is a chance that the user is concurrently performing refresh
                                332                 :                :          * for the subscription where we remove the table state and its origin
                                333                 :                :          * or the apply worker would have removed this origin. So passing
                                334                 :                :          * missing_ok = true.
                                335                 :                :          */
                                336                 :            184 :         replorigin_drop_by_name(originname, true, false);
                                337                 :                : 
   12 akapila@postgresql.o      338                 :GNC         184 :         FinishSyncWorker();
                                339                 :                :     }
                                340                 :                :     else
 3141 peter_e@gmx.net           341                 :CBC          13 :         SpinLockRelease(&MyLogicalRepWorker->relmutex);
                                342                 :             13 : }
                                343                 :                : 
                                344                 :                : /*
                                345                 :                :  * Handle table synchronization cooperation from the apply worker.
                                346                 :                :  *
                                347                 :                :  * Walk over all subscription tables that are individually tracked by the
                                348                 :                :  * apply process (currently, all that have state other than
                                349                 :                :  * SUBREL_STATE_READY) and manage synchronization for them.
                                350                 :                :  *
                                351                 :                :  * If there are tables that need synchronizing and are not being synchronized
                                352                 :                :  * yet, start sync workers for them (if there are free slots for sync
                                353                 :                :  * workers).  To prevent starting the sync worker for the same relation at a
                                354                 :                :  * high frequency after a failure, we store its last start time with each sync
                                355                 :                :  * state info.  We start the sync worker for the same relation after waiting
                                356                 :                :  * at least wal_retrieve_retry_interval.
                                357                 :                :  *
                                358                 :                :  * For tables that are being synchronized already, check if sync workers
                                359                 :                :  * either need action from the apply worker or have finished.  This is the
                                360                 :                :  * SYNCWAIT to CATCHUP transition.
                                361                 :                :  *
                                362                 :                :  * If the synchronization position is reached (SYNCDONE), then the table can
                                363                 :                :  * be marked as READY and is no longer tracked.
                                364                 :                :  */
                                365                 :                : void
   12 akapila@postgresql.o      366                 :GNC        3032 : ProcessSyncingTablesForApply(XLogRecPtr current_lsn)
                                367                 :                : {
                                368                 :                :     struct tablesync_start_time_mapping
                                369                 :                :     {
                                370                 :                :         Oid         relid;
                                371                 :                :         TimestampTz last_start_time;
                                372                 :                :     };
                                373                 :                :     static HTAB *last_start_times = NULL;
                                374                 :                :     ListCell   *lc;
 3095 peter_e@gmx.net           375                 :CBC        3032 :     bool        started_tx = false;
 1026 tgl@sss.pgh.pa.us         376                 :           3032 :     bool        should_exit = false;
   88 akapila@postgresql.o      377                 :           3032 :     Relation    rel = NULL;
                                378                 :                : 
 3141 peter_e@gmx.net           379         [ -  + ]:           3032 :     Assert(!IsTransactionState());
                                380                 :                : 
                                381                 :                :     /* We need up-to-date sync state info for subscription tables here. */
   12 akapila@postgresql.o      382                 :GNC        3032 :     FetchRelationStates(&started_tx);
                                383                 :                : 
                                384                 :                :     /*
                                385                 :                :      * Prepare a hash table for tracking last start times of workers, to avoid
                                386                 :                :      * immediate restarts.  We don't need it if there are no tables that need
                                387                 :                :      * syncing.
                                388                 :                :      */
 1168 tgl@sss.pgh.pa.us         389   [ +  +  +  + ]:CBC        3032 :     if (table_states_not_ready != NIL && !last_start_times)
 3106 peter_e@gmx.net           390                 :            121 :     {
                                391                 :                :         HASHCTL     ctl;
                                392                 :                : 
                                393                 :            121 :         ctl.keysize = sizeof(Oid);
                                394                 :            121 :         ctl.entrysize = sizeof(struct tablesync_start_time_mapping);
                                395                 :            121 :         last_start_times = hash_create("Logical replication table sync worker start times",
                                396                 :                :                                        256, &ctl, HASH_ELEM | HASH_BLOBS);
                                397                 :                :     }
                                398                 :                : 
                                399                 :                :     /*
                                400                 :                :      * Clean up the hash table when we're done with all tables (just to
                                401                 :                :      * release the bit of memory).
                                402                 :                :      */
 1168 tgl@sss.pgh.pa.us         403   [ +  +  +  + ]:           2911 :     else if (table_states_not_ready == NIL && last_start_times)
                                404                 :                :     {
 3106 peter_e@gmx.net           405                 :             90 :         hash_destroy(last_start_times);
                                406                 :             90 :         last_start_times = NULL;
                                407                 :                :     }
                                408                 :                : 
                                409                 :                :     /*
                                410                 :                :      * Process all tables that are being synchronized.
                                411                 :                :      */
 1567 akapila@postgresql.o      412   [ +  +  +  +  :           4863 :     foreach(lc, table_states_not_ready)
                                              +  + ]
                                413                 :                :     {
 3086 bruce@momjian.us          414                 :           1833 :         SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
                                415                 :                : 
 3141 peter_e@gmx.net           416         [ +  + ]:           1833 :         if (rstate->state == SUBREL_STATE_SYNCDONE)
                                417                 :                :         {
                                418                 :                :             /*
                                419                 :                :              * Apply has caught up to the position where the table sync has
                                420                 :                :              * finished.  Mark the table as ready so that the apply will just
                                421                 :                :              * continue to replicate it normally.
                                422                 :                :              */
                                423         [ +  + ]:            181 :             if (current_lsn >= rstate->lsn)
                                424                 :                :             {
                                425                 :                :                 char        originname[NAMEDATALEN];
                                426                 :                : 
                                427                 :            180 :                 rstate->state = SUBREL_STATE_READY;
                                428                 :            180 :                 rstate->lsn = current_lsn;
 3095                           429         [ +  + ]:            180 :                 if (!started_tx)
                                430                 :                :                 {
                                431                 :              9 :                     StartTransactionCommand();
                                432                 :              9 :                     started_tx = true;
                                433                 :                :                 }
                                434                 :                : 
                                435                 :                :                 /*
                                436                 :                :                  * Remove the tablesync origin tracking if exists.
                                437                 :                :                  *
                                438                 :                :                  * There is a chance that the user is concurrently performing
                                439                 :                :                  * refresh for the subscription where we remove the table
                                440                 :                :                  * state and its origin or the tablesync worker would have
                                441                 :                :                  * already removed this origin. We can't rely on tablesync
                                442                 :                :                  * worker to remove the origin tracking as if there is any
                                443                 :                :                  * error while dropping we won't restart it to drop the
                                444                 :                :                  * origin. So passing missing_ok = true.
                                445                 :                :                  *
                                446                 :                :                  * Lock the subscription and origin in the same order as we
                                447                 :                :                  * are doing during DDL commands to avoid deadlocks. See
                                448                 :                :                  * AlterSubscription_refresh.
                                449                 :                :                  */
   88 akapila@postgresql.o      450                 :            180 :                 LockSharedObject(SubscriptionRelationId, MyLogicalRepWorker->subid,
                                451                 :                :                                  0, AccessShareLock);
                                452                 :                : 
                                453         [ +  - ]:            180 :                 if (!rel)
                                454                 :            180 :                     rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
                                455                 :                : 
 1113                           456                 :            180 :                 ReplicationOriginNameForLogicalRep(MyLogicalRepWorker->subid,
                                457                 :                :                                                    rstate->relid,
                                458                 :                :                                                    originname,
                                459                 :                :                                                    sizeof(originname));
 1142                           460                 :            180 :                 replorigin_drop_by_name(originname, true, false);
                                461                 :                : 
                                462                 :                :                 /*
                                463                 :                :                  * Update the state to READY only after the origin cleanup.
                                464                 :                :                  */
 2762 peter_e@gmx.net           465                 :            180 :                 UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
                                466                 :            180 :                                            rstate->relid, rstate->state,
                                467                 :                :                                            rstate->lsn, true);
                                468                 :                :             }
                                469                 :                :         }
                                470                 :                :         else
                                471                 :                :         {
                                472                 :                :             LogicalRepWorker *syncworker;
                                473                 :                : 
                                474                 :                :             /*
                                475                 :                :              * Look for a sync worker for this relation.
                                476                 :                :              */
 3141                           477                 :           1652 :             LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
                                478                 :                : 
                                479                 :           1652 :             syncworker = logicalrep_worker_find(MyLogicalRepWorker->subid,
                                480                 :                :                                                 rstate->relid, false);
                                481                 :                : 
                                482         [ +  + ]:           1652 :             if (syncworker)
                                483                 :                :             {
                                484                 :                :                 /* Found one, update our copy of its state */
                                485         [ -  + ]:            747 :                 SpinLockAcquire(&syncworker->relmutex);
                                486                 :            747 :                 rstate->state = syncworker->relstate;
                                487                 :            747 :                 rstate->lsn = syncworker->relstate_lsn;
 3042 tgl@sss.pgh.pa.us         488         [ +  + ]:            747 :                 if (rstate->state == SUBREL_STATE_SYNCWAIT)
                                489                 :                :                 {
                                490                 :                :                     /*
                                491                 :                :                      * Sync worker is waiting for apply.  Tell sync worker it
                                492                 :                :                      * can catchup now.
                                493                 :                :                      */
                                494                 :            182 :                     syncworker->relstate = SUBREL_STATE_CATCHUP;
                                495                 :            182 :                     syncworker->relstate_lsn =
                                496                 :            182 :                         Max(syncworker->relstate_lsn, current_lsn);
                                497                 :                :                 }
 3141 peter_e@gmx.net           498                 :            747 :                 SpinLockRelease(&syncworker->relmutex);
                                499                 :                : 
                                500                 :                :                 /* If we told worker to catch up, wait for it. */
 3042 tgl@sss.pgh.pa.us         501         [ +  + ]:            747 :                 if (rstate->state == SUBREL_STATE_SYNCWAIT)
                                502                 :                :                 {
                                503                 :                :                     /* Signal the sync worker, as it may be waiting for us. */
                                504         [ +  - ]:            182 :                     if (syncworker->proc)
                                505                 :            182 :                         logicalrep_worker_wakeup_ptr(syncworker);
                                506                 :                : 
                                507                 :                :                     /* Now safe to release the LWLock */
                                508                 :            182 :                     LWLockRelease(LogicalRepWorkerLock);
                                509                 :                : 
  687 akapila@postgresql.o      510         [ +  - ]:            182 :                     if (started_tx)
                                511                 :                :                     {
                                512                 :                :                         /*
                                513                 :                :                          * We must commit the existing transaction to release
                                514                 :                :                          * the existing locks before entering a busy loop.
                                515                 :                :                          * This is required to avoid any undetected deadlocks
                                516                 :                :                          * due to any existing lock as deadlock detector won't
                                517                 :                :                          * be able to detect the waits on the latch.
                                518                 :                :                          *
                                519                 :                :                          * Also close any tables prior to the commit.
                                520                 :                :                          */
   88                           521         [ +  + ]:            182 :                         if (rel)
                                522                 :                :                         {
                                523                 :             31 :                             table_close(rel, NoLock);
                                524                 :             31 :                             rel = NULL;
                                525                 :                :                         }
  687                           526                 :            182 :                         CommitTransactionCommand();
                                527                 :            182 :                         pgstat_report_stat(false);
                                528                 :                :                     }
                                529                 :                : 
                                530                 :                :                     /*
                                531                 :                :                      * Enter busy loop and wait for synchronization worker to
                                532                 :                :                      * reach expected state (or die trying).
                                533                 :                :                      */
                                534                 :            182 :                     StartTransactionCommand();
                                535                 :            182 :                     started_tx = true;
                                536                 :                : 
   12 akapila@postgresql.o      537                 :GNC         182 :                     wait_for_table_state_change(rstate->relid,
                                538                 :                :                                                 SUBREL_STATE_SYNCDONE);
                                539                 :                :                 }
                                540                 :                :                 else
 3042 tgl@sss.pgh.pa.us         541                 :CBC         565 :                     LWLockRelease(LogicalRepWorkerLock);
                                542                 :                :             }
                                543                 :                :             else
                                544                 :                :             {
                                545                 :                :                 /*
                                546                 :                :                  * If there is no sync worker for this table yet, count
                                547                 :                :                  * running sync workers for this subscription, while we have
                                548                 :                :                  * the lock.
                                549                 :                :                  */
                                550                 :                :                 int         nsyncworkers =
  893                           551                 :            905 :                     logicalrep_sync_worker_count(MyLogicalRepWorker->subid);
                                552                 :                : 
                                553                 :                :                 /* Now safe to release the LWLock */
 3042                           554                 :            905 :                 LWLockRelease(LogicalRepWorkerLock);
                                555                 :                : 
                                556                 :                :                 /*
                                557                 :                :                  * If there are free sync worker slot(s), start a new sync
                                558                 :                :                  * worker for the table.
                                559                 :                :                  */
                                560         [ +  + ]:            905 :                 if (nsyncworkers < max_sync_workers_per_subscription)
                                561                 :                :                 {
                                562                 :            208 :                     TimestampTz now = GetCurrentTimestamp();
                                563                 :                :                     struct tablesync_start_time_mapping *hentry;
                                564                 :                :                     bool        found;
                                565                 :                : 
                                566                 :            208 :                     hentry = hash_search(last_start_times, &rstate->relid,
                                567                 :                :                                          HASH_ENTER, &found);
                                568                 :                : 
                                569   [ +  +  +  + ]:            226 :                     if (!found ||
                                570                 :             18 :                         TimestampDifferenceExceeds(hentry->last_start_time, now,
                                571                 :                :                                                    wal_retrieve_retry_interval))
                                572                 :                :                     {
                                573                 :                :                         /*
                                574                 :                :                          * Set the last_start_time even if we fail to start
                                575                 :                :                          * the worker, so that we won't retry until
                                576                 :                :                          * wal_retrieve_retry_interval has elapsed.
                                577                 :                :                          */
                                578                 :            196 :                         hentry->last_start_time = now;
  126                           579                 :            196 :                         (void) logicalrep_worker_launch(WORKERTYPE_TABLESYNC,
                                580                 :            196 :                                                         MyLogicalRepWorker->dbid,
                                581                 :            196 :                                                         MySubscription->oid,
                                582                 :            196 :                                                         MySubscription->name,
                                583                 :            196 :                                                         MyLogicalRepWorker->userid,
                                584                 :                :                                                         rstate->relid,
                                585                 :                :                                                         DSM_HANDLE_INVALID,
                                586                 :                :                                                         false);
                                587                 :                :                     }
                                588                 :                :                 }
                                589                 :                :             }
                                590                 :                :         }
                                591                 :                :     }
                                592                 :                : 
                                593                 :                :     /* Close table if opened */
   88 akapila@postgresql.o      594         [ +  + ]:           3030 :     if (rel)
                                595                 :            149 :         table_close(rel, NoLock);
                                596                 :                : 
                                597                 :                : 
 3095 peter_e@gmx.net           598         [ +  + ]:           3030 :     if (started_tx)
                                599                 :                :     {
                                600                 :                :         /*
                                601                 :                :          * Even when the two_phase mode is requested by the user, it remains
                                602                 :                :          * as 'pending' until all tablesyncs have reached READY state.
                                603                 :                :          *
                                604                 :                :          * When this happens, we restart the apply worker and (if the
                                605                 :                :          * conditions are still ok) then the two_phase tri-state will become
                                606                 :                :          * 'enabled' at that time.
                                607                 :                :          *
                                608                 :                :          * Note: If the subscription has no tables then leave the state as
                                609                 :                :          * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
                                610                 :                :          * work.
                                611                 :                :          */
 1026 tgl@sss.pgh.pa.us         612         [ +  + ]:            866 :         if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING)
                                613                 :                :         {
                                614                 :             25 :             CommandCounterIncrement();  /* make updates visible */
                                615         [ +  + ]:             25 :             if (AllTablesyncsReady())
                                616                 :                :             {
                                617         [ +  - ]:              6 :                 ereport(LOG,
                                618                 :                :                         (errmsg("logical replication apply worker for subscription \"%s\" will restart so that two_phase can be enabled",
                                619                 :                :                                 MySubscription->name)));
                                620                 :              6 :                 should_exit = true;
                                621                 :                :             }
                                622                 :                :         }
                                623                 :                : 
 3095 peter_e@gmx.net           624                 :            866 :         CommitTransactionCommand();
 1301 andres@anarazel.de        625                 :            866 :         pgstat_report_stat(true);
                                626                 :                :     }
                                627                 :                : 
 1026 tgl@sss.pgh.pa.us         628         [ +  + ]:           3030 :     if (should_exit)
                                629                 :                :     {
                                630                 :                :         /*
                                631                 :                :          * Reset the last-start time for this worker so that the launcher will
                                632                 :                :          * restart it without waiting for wal_retrieve_retry_interval.
                                633                 :                :          */
 1010                           634                 :              6 :         ApplyLauncherForgetWorkerStartTime(MySubscription->oid);
                                635                 :                : 
 1026                           636                 :              6 :         proc_exit(0);
                                637                 :                :     }
 3141 peter_e@gmx.net           638                 :           3024 : }
                                639                 :                : 
                                640                 :                : /*
                                641                 :                :  * Create list of columns for COPY based on logical relation mapping.
                                642                 :                :  */
                                643                 :                : static List *
                                644                 :            192 : make_copy_attnamelist(LogicalRepRelMapEntry *rel)
                                645                 :                : {
                                646                 :            192 :     List       *attnamelist = NIL;
                                647                 :                :     int         i;
                                648                 :                : 
 3085                           649         [ +  + ]:            516 :     for (i = 0; i < rel->remoterel.natts; i++)
                                650                 :                :     {
 3141                           651                 :            324 :         attnamelist = lappend(attnamelist,
 3085                           652                 :            324 :                               makeString(rel->remoterel.attnames[i]));
                                653                 :                :     }
                                654                 :                : 
                                655                 :                : 
 3141                           656                 :            192 :     return attnamelist;
                                657                 :                : }
                                658                 :                : 
                                659                 :                : /*
                                660                 :                :  * Data source callback for the COPY FROM, which reads from the remote
                                661                 :                :  * connection and passes the data back to our local COPY.
                                662                 :                :  */
                                663                 :                : static int
                                664                 :          13985 : copy_read_data(void *outbuf, int minread, int maxread)
                                665                 :                : {
 3086 bruce@momjian.us          666                 :          13985 :     int         bytesread = 0;
                                667                 :                :     int         avail;
                                668                 :                : 
                                669                 :                :     /* If there are some leftover data from previous read, use it. */
 3141 peter_e@gmx.net           670                 :          13985 :     avail = copybuf->len - copybuf->cursor;
                                671         [ -  + ]:          13985 :     if (avail)
                                672                 :                :     {
 3141 peter_e@gmx.net           673         [ #  # ]:UBC           0 :         if (avail > maxread)
                                674                 :              0 :             avail = maxread;
                                675                 :              0 :         memcpy(outbuf, &copybuf->data[copybuf->cursor], avail);
                                676                 :              0 :         copybuf->cursor += avail;
                                677                 :              0 :         maxread -= avail;
                                678                 :              0 :         bytesread += avail;
                                679                 :                :     }
                                680                 :                : 
 3070 peter_e@gmx.net           681   [ +  -  +  - ]:CBC       13985 :     while (maxread > 0 && bytesread < minread)
                                682                 :                :     {
 3141                           683                 :          13985 :         pgsocket    fd = PGINVALID_SOCKET;
                                684                 :                :         int         len;
                                685                 :          13985 :         char       *buf = NULL;
                                686                 :                : 
                                687                 :                :         for (;;)
                                688                 :                :         {
                                689                 :                :             /* Try read the data. */
 1630 alvherre@alvh.no-ip.      690                 :          13985 :             len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
                                691                 :                : 
 3141 peter_e@gmx.net           692         [ -  + ]:          13985 :             CHECK_FOR_INTERRUPTS();
                                693                 :                : 
                                694         [ -  + ]:          13985 :             if (len == 0)
 3141 peter_e@gmx.net           695                 :UBC           0 :                 break;
 3141 peter_e@gmx.net           696         [ +  + ]:CBC       13985 :             else if (len < 0)
                                697                 :          13985 :                 return bytesread;
                                698                 :                :             else
                                699                 :                :             {
                                700                 :                :                 /* Process the data */
                                701                 :          13795 :                 copybuf->data = buf;
                                702                 :          13795 :                 copybuf->len = len;
                                703                 :          13795 :                 copybuf->cursor = 0;
                                704                 :                : 
                                705                 :          13795 :                 avail = copybuf->len - copybuf->cursor;
                                706         [ -  + ]:          13795 :                 if (avail > maxread)
 3141 peter_e@gmx.net           707                 :UBC           0 :                     avail = maxread;
 3141 peter_e@gmx.net           708                 :CBC       13795 :                 memcpy(outbuf, &copybuf->data[copybuf->cursor], avail);
  334 peter@eisentraut.org      709                 :          13795 :                 outbuf = (char *) outbuf + avail;
 3141 peter_e@gmx.net           710                 :          13795 :                 copybuf->cursor += avail;
                                711                 :          13795 :                 maxread -= avail;
                                712                 :          13795 :                 bytesread += avail;
                                713                 :                :             }
                                714                 :                : 
                                715   [ +  -  +  - ]:          13795 :             if (maxread <= 0 || bytesread >= minread)
                                716                 :          13795 :                 return bytesread;
                                717                 :                :         }
                                718                 :                : 
                                719                 :                :         /*
                                720                 :                :          * Wait for more data or latch.
                                721                 :                :          */
 2531 tmunro@postgresql.or      722                 :UBC           0 :         (void) WaitLatchOrSocket(MyLatch,
                                723                 :                :                                  WL_SOCKET_READABLE | WL_LATCH_SET |
                                724                 :                :                                  WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
                                725                 :                :                                  fd, 1000L, WAIT_EVENT_LOGICAL_SYNC_DATA);
                                726                 :                : 
 3066 andres@anarazel.de        727                 :              0 :         ResetLatch(MyLatch);
                                728                 :                :     }
                                729                 :                : 
 3141 peter_e@gmx.net           730                 :              0 :     return bytesread;
                                731                 :                : }
                                732                 :                : 
                                733                 :                : 
                                734                 :                : /*
                                735                 :                :  * Get information about remote relation in similar fashion the RELATION
                                736                 :                :  * message provides during replication.
                                737                 :                :  *
                                738                 :                :  * This function also returns (a) the relation qualifications to be used in
                                739                 :                :  * the COPY command, and (b) whether the remote relation has published any
                                740                 :                :  * generated column.
                                741                 :                :  */
                                742                 :                : static void
  363 akapila@postgresql.o      743                 :CBC         195 : fetch_remote_table_info(char *nspname, char *relname, LogicalRepRelation *lrel,
                                744                 :                :                         List **qual, bool *gencol_published)
                                745                 :                : {
                                746                 :                :     WalRcvExecResult *res;
                                747                 :                :     StringInfoData cmd;
                                748                 :                :     TupleTableSlot *slot;
 2049 peter@eisentraut.org      749                 :            195 :     Oid         tableRow[] = {OIDOID, CHAROID, CHAROID};
  363 akapila@postgresql.o      750                 :            195 :     Oid         attrRow[] = {INT2OID, TEXTOID, OIDOID, BOOLOID, BOOLOID};
 1344                           751                 :            195 :     Oid         qualRow[] = {TEXTOID};
                                752                 :                :     bool        isnull;
                                753                 :                :     int         natt;
  368 michael@paquier.xyz       754                 :            195 :     StringInfo  pub_names = NULL;
 1312 tomas.vondra@postgre      755                 :            195 :     Bitmapset  *included_cols = NULL;
  363 akapila@postgresql.o      756                 :            195 :     int         server_version = walrcv_server_version(LogRepWorkerWalRcvConn);
                                757                 :                : 
 3141 peter_e@gmx.net           758                 :            195 :     lrel->nspname = nspname;
                                759                 :            195 :     lrel->relname = relname;
                                760                 :                : 
                                761                 :                :     /* First fetch Oid and replica identity. */
                                762                 :            195 :     initStringInfo(&cmd);
 2049 peter@eisentraut.org      763                 :            195 :     appendStringInfo(&cmd, "SELECT c.oid, c.relreplident, c.relkind"
                                764                 :                :                      "  FROM pg_catalog.pg_class c"
                                765                 :                :                      "  INNER JOIN pg_catalog.pg_namespace n"
                                766                 :                :                      "        ON (c.relnamespace = n.oid)"
                                767                 :                :                      " WHERE n.nspname = %s"
                                768                 :                :                      "   AND c.relname = %s",
                                769                 :                :                      quote_literal_cstr(nspname),
                                770                 :                :                      quote_literal_cstr(relname));
 1630 alvherre@alvh.no-ip.      771                 :            195 :     res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
                                772                 :                :                       lengthof(tableRow), tableRow);
                                773                 :                : 
 3141 peter_e@gmx.net           774         [ -  + ]:            195 :     if (res->status != WALRCV_OK_TUPLES)
 3141 peter_e@gmx.net           775         [ #  # ]:UBC           0 :         ereport(ERROR,
                                776                 :                :                 (errcode(ERRCODE_CONNECTION_FAILURE),
                                777                 :                :                  errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s",
                                778                 :                :                         nspname, relname, res->err)));
                                779                 :                : 
 2539 andres@anarazel.de        780                 :CBC         195 :     slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
 3141 peter_e@gmx.net           781         [ +  + ]:            195 :     if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot))
 3141 peter_e@gmx.net           782         [ +  - ]:GBC           1 :         ereport(ERROR,
                                783                 :                :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
                                784                 :                :                  errmsg("table \"%s.%s\" not found on publisher",
                                785                 :                :                         nspname, relname)));
                                786                 :                : 
 3141 peter_e@gmx.net           787                 :CBC         194 :     lrel->remoteid = DatumGetObjectId(slot_getattr(slot, 1, &isnull));
                                788         [ -  + ]:            194 :     Assert(!isnull);
                                789                 :            194 :     lrel->replident = DatumGetChar(slot_getattr(slot, 2, &isnull));
                                790         [ -  + ]:            194 :     Assert(!isnull);
 2049 peter@eisentraut.org      791                 :            194 :     lrel->relkind = DatumGetChar(slot_getattr(slot, 3, &isnull));
                                792         [ -  + ]:            194 :     Assert(!isnull);
                                793                 :                : 
 3141 peter_e@gmx.net           794                 :            194 :     ExecDropSingleTupleTableSlot(slot);
                                795                 :            194 :     walrcv_clear_result(res);
                                796                 :                : 
                                797                 :                : 
                                798                 :                :     /*
                                799                 :                :      * Get column lists for each relation.
                                800                 :                :      *
                                801                 :                :      * We need to do this before fetching info about column names and types,
                                802                 :                :      * so that we can skip columns that should not be replicated.
                                803                 :                :      */
  363 akapila@postgresql.o      804         [ +  - ]:            194 :     if (server_version >= 150000)
                                805                 :                :     {
                                806                 :                :         WalRcvExecResult *pubres;
                                807                 :                :         TupleTableSlot *tslot;
 1244                           808                 :            194 :         Oid         attrsRow[] = {INT2VECTOROID};
                                809                 :                : 
                                810                 :                :         /* Build the pub_names comma-separated string. */
  368 michael@paquier.xyz       811                 :            194 :         pub_names = makeStringInfo();
                                812                 :            194 :         GetPublicationsStr(MySubscription->publications, pub_names, true);
                                813                 :                : 
                                814                 :                :         /*
                                815                 :                :          * Fetch info about column lists for the relation (from all the
                                816                 :                :          * publications).
                                817                 :                :          */
 1312 tomas.vondra@postgre      818                 :            194 :         resetStringInfo(&cmd);
                                819                 :            194 :         appendStringInfo(&cmd,
                                820                 :                :                          "SELECT DISTINCT"
                                821                 :                :                          "  (CASE WHEN (array_length(gpt.attrs, 1) = c.relnatts)"
                                822                 :                :                          "   THEN NULL ELSE gpt.attrs END)"
                                823                 :                :                          "  FROM pg_publication p,"
                                824                 :                :                          "  LATERAL pg_get_publication_tables(p.pubname) gpt,"
                                825                 :                :                          "  pg_class c"
                                826                 :                :                          " WHERE gpt.relid = %u AND c.oid = gpt.relid"
                                827                 :                :                          "   AND p.pubname IN ( %s )",
                                828                 :                :                          lrel->remoteid,
                                829                 :                :                          pub_names->data);
                                830                 :                : 
                                831                 :            194 :         pubres = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
                                832                 :                :                              lengthof(attrsRow), attrsRow);
                                833                 :                : 
                                834         [ -  + ]:            194 :         if (pubres->status != WALRCV_OK_TUPLES)
 1312 tomas.vondra@postgre      835         [ #  # ]:UBC           0 :             ereport(ERROR,
                                836                 :                :                     (errcode(ERRCODE_CONNECTION_FAILURE),
                                837                 :                :                      errmsg("could not fetch column list info for table \"%s.%s\" from publisher: %s",
                                838                 :                :                             nspname, relname, pubres->err)));
                                839                 :                : 
                                840                 :                :         /*
                                841                 :                :          * We don't support the case where the column list is different for
                                842                 :                :          * the same table when combining publications. See comments atop
                                843                 :                :          * fetch_relation_list. So there should be only one row returned.
                                844                 :                :          * Although we already checked this when creating the subscription, we
                                845                 :                :          * still need to check here in case the column list was changed after
                                846                 :                :          * creating the subscription and before the sync worker is started.
                                847                 :                :          */
 1244 akapila@postgresql.o      848         [ -  + ]:CBC         194 :         if (tuplestore_tuple_count(pubres->tuplestore) > 1)
 1244 akapila@postgresql.o      849         [ #  # ]:UBC           0 :             ereport(ERROR,
                                850                 :                :                     errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                                851                 :                :                     errmsg("cannot use different column lists for table \"%s.%s\" in different publications",
                                852                 :                :                            nspname, relname));
                                853                 :                : 
                                854                 :                :         /*
                                855                 :                :          * Get the column list and build a single bitmap with the attnums.
                                856                 :                :          *
                                857                 :                :          * If we find a NULL value, it means all the columns should be
                                858                 :                :          * replicated.
                                859                 :                :          */
 1165 drowley@postgresql.o      860                 :CBC         194 :         tslot = MakeSingleTupleTableSlot(pubres->tupledesc, &TTSOpsMinimalTuple);
                                861         [ +  - ]:            194 :         if (tuplestore_gettupleslot(pubres->tuplestore, true, false, tslot))
                                862                 :                :         {
                                863                 :            194 :             Datum       cfval = slot_getattr(tslot, 1, &isnull);
                                864                 :                : 
 1244 akapila@postgresql.o      865         [ +  + ]:            194 :             if (!isnull)
                                866                 :                :             {
                                867                 :                :                 ArrayType  *arr;
                                868                 :                :                 int         nelems;
                                869                 :                :                 int16      *elems;
                                870                 :                : 
                                871                 :             22 :                 arr = DatumGetArrayTypeP(cfval);
                                872                 :             22 :                 nelems = ARR_DIMS(arr)[0];
                                873         [ -  + ]:             22 :                 elems = (int16 *) ARR_DATA_PTR(arr);
                                874                 :                : 
                                875         [ +  + ]:             59 :                 for (natt = 0; natt < nelems; natt++)
                                876                 :             37 :                     included_cols = bms_add_member(included_cols, elems[natt]);
                                877                 :                :             }
                                878                 :                : 
 1165 drowley@postgresql.o      879                 :            194 :             ExecClearTuple(tslot);
                                880                 :                :         }
                                881                 :            194 :         ExecDropSingleTupleTableSlot(tslot);
                                882                 :                : 
 1312 tomas.vondra@postgre      883                 :            194 :         walrcv_clear_result(pubres);
                                884                 :                :     }
                                885                 :                : 
                                886                 :                :     /*
                                887                 :                :      * Now fetch column names and types.
                                888                 :                :      */
 3141 peter_e@gmx.net           889                 :            194 :     resetStringInfo(&cmd);
  200 drowley@postgresql.o      890                 :            194 :     appendStringInfoString(&cmd,
                                891                 :                :                            "SELECT a.attnum,"
                                892                 :                :                            "       a.attname,"
                                893                 :                :                            "       a.atttypid,"
                                894                 :                :                            "       a.attnum = ANY(i.indkey)");
                                895                 :                : 
                                896                 :                :     /* Generated columns can be replicated since version 18. */
  363 akapila@postgresql.o      897         [ +  - ]:            194 :     if (server_version >= 180000)
  200 drowley@postgresql.o      898                 :            194 :         appendStringInfoString(&cmd, ", a.attgenerated != ''");
                                899                 :                : 
  363 akapila@postgresql.o      900         [ +  - ]:            388 :     appendStringInfo(&cmd,
                                901                 :                :                      "  FROM pg_catalog.pg_attribute a"
                                902                 :                :                      "  LEFT JOIN pg_catalog.pg_index i"
                                903                 :                :                      "       ON (i.indexrelid = pg_get_replica_identity_index(%u))"
                                904                 :                :                      " WHERE a.attnum > 0::pg_catalog.int2"
                                905                 :                :                      "   AND NOT a.attisdropped %s"
                                906                 :                :                      "   AND a.attrelid = %u"
                                907                 :                :                      " ORDER BY a.attnum",
                                908                 :                :                      lrel->remoteid,
                                909         [ -  + ]:            194 :                      (server_version >= 120000 && server_version < 180000 ?
                                910                 :                :                       "AND a.attgenerated = ''" : ""),
                                911                 :                :                      lrel->remoteid);
 1630 alvherre@alvh.no-ip.      912         [ +  - ]:            194 :     res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
                                913                 :                :                       server_version >= 180000 ? lengthof(attrRow) : lengthof(attrRow) - 1, attrRow);
                                914                 :                : 
 3141 peter_e@gmx.net           915         [ -  + ]:            194 :     if (res->status != WALRCV_OK_TUPLES)
 3141 peter_e@gmx.net           916         [ #  # ]:UBC           0 :         ereport(ERROR,
                                917                 :                :                 (errcode(ERRCODE_CONNECTION_FAILURE),
                                918                 :                :                  errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s",
                                919                 :                :                         nspname, relname, res->err)));
                                920                 :                : 
                                921                 :                :     /* We don't know the number of rows coming, so allocate enough space. */
 3141 peter_e@gmx.net           922                 :CBC         194 :     lrel->attnames = palloc0(MaxTupleAttributeNumber * sizeof(char *));
                                923                 :            194 :     lrel->atttyps = palloc0(MaxTupleAttributeNumber * sizeof(Oid));
                                924                 :            194 :     lrel->attkeys = NULL;
                                925                 :                : 
                                926                 :                :     /*
                                927                 :                :      * Store the columns as a list of names.  Ignore those that are not
                                928                 :                :      * present in the column list, if there is one.
                                929                 :                :      */
                                930                 :            194 :     natt = 0;
 2539 andres@anarazel.de        931                 :            194 :     slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
 3141 peter_e@gmx.net           932         [ +  + ]:            555 :     while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
                                933                 :                :     {
                                934                 :                :         char       *rel_colname;
                                935                 :                :         AttrNumber  attnum;
                                936                 :                : 
 1312 tomas.vondra@postgre      937                 :            361 :         attnum = DatumGetInt16(slot_getattr(slot, 1, &isnull));
                                938         [ -  + ]:            361 :         Assert(!isnull);
                                939                 :                : 
                                940                 :                :         /* If the column is not in the column list, skip it. */
                                941   [ +  +  +  + ]:            361 :         if (included_cols != NULL && !bms_is_member(attnum, included_cols))
                                942                 :                :         {
                                943                 :             31 :             ExecClearTuple(slot);
                                944                 :             31 :             continue;
                                945                 :                :         }
                                946                 :                : 
                                947                 :            330 :         rel_colname = TextDatumGetCString(slot_getattr(slot, 2, &isnull));
 3141 peter_e@gmx.net           948         [ -  + ]:            330 :         Assert(!isnull);
                                949                 :                : 
 1312 tomas.vondra@postgre      950                 :            330 :         lrel->attnames[natt] = rel_colname;
                                951                 :            330 :         lrel->atttyps[natt] = DatumGetObjectId(slot_getattr(slot, 3, &isnull));
 3141 peter_e@gmx.net           952         [ -  + ]:            330 :         Assert(!isnull);
                                953                 :                : 
 1312 tomas.vondra@postgre      954         [ +  + ]:            330 :         if (DatumGetBool(slot_getattr(slot, 4, &isnull)))
 3141 peter_e@gmx.net           955                 :            108 :             lrel->attkeys = bms_add_member(lrel->attkeys, natt);
                                956                 :                : 
                                957                 :                :         /* Remember if the remote table has published any generated column. */
  363 akapila@postgresql.o      958   [ +  -  +  - ]:            330 :         if (server_version >= 180000 && !(*gencol_published))
                                959                 :                :         {
                                960                 :            330 :             *gencol_published = DatumGetBool(slot_getattr(slot, 5, &isnull));
                                961         [ -  + ]:            330 :             Assert(!isnull);
                                962                 :                :         }
                                963                 :                : 
                                964                 :                :         /* Should never happen. */
 3141 peter_e@gmx.net           965         [ -  + ]:            330 :         if (++natt >= MaxTupleAttributeNumber)
 3141 peter_e@gmx.net           966         [ #  # ]:UBC           0 :             elog(ERROR, "too many columns in remote table \"%s.%s\"",
                                967                 :                :                  nspname, relname);
                                968                 :                : 
 3141 peter_e@gmx.net           969                 :CBC         330 :         ExecClearTuple(slot);
                                970                 :                :     }
                                971                 :            194 :     ExecDropSingleTupleTableSlot(slot);
                                972                 :                : 
                                973                 :            194 :     lrel->natts = natt;
                                974                 :                : 
                                975                 :            194 :     walrcv_clear_result(res);
                                976                 :                : 
                                977                 :                :     /*
                                978                 :                :      * Get relation's row filter expressions. DISTINCT avoids the same
                                979                 :                :      * expression of a table in multiple publications from being included
                                980                 :                :      * multiple times in the final expression.
                                981                 :                :      *
                                982                 :                :      * We need to copy the row even if it matches just one of the
                                983                 :                :      * publications, so we later combine all the quals with OR.
                                984                 :                :      *
                                985                 :                :      * For initial synchronization, row filtering can be ignored in following
                                986                 :                :      * cases:
                                987                 :                :      *
                                988                 :                :      * 1) one of the subscribed publications for the table hasn't specified
                                989                 :                :      * any row filter
                                990                 :                :      *
                                991                 :                :      * 2) one of the subscribed publications has puballtables set to true
                                992                 :                :      *
                                993                 :                :      * 3) one of the subscribed publications is declared as TABLES IN SCHEMA
                                994                 :                :      * that includes this relation
                                995                 :                :      */
  363 akapila@postgresql.o      996         [ +  - ]:            194 :     if (server_version >= 150000)
                                997                 :                :     {
                                998                 :                :         /* Reuse the already-built pub_names. */
  368 michael@paquier.xyz       999         [ -  + ]:            194 :         Assert(pub_names != NULL);
                               1000                 :                : 
                               1001                 :                :         /* Check for row filters. */
 1344 akapila@postgresql.o     1002                 :            194 :         resetStringInfo(&cmd);
                               1003                 :            194 :         appendStringInfo(&cmd,
                               1004                 :                :                          "SELECT DISTINCT pg_get_expr(gpt.qual, gpt.relid)"
                               1005                 :                :                          "  FROM pg_publication p,"
                               1006                 :                :                          "  LATERAL pg_get_publication_tables(p.pubname) gpt"
                               1007                 :                :                          " WHERE gpt.relid = %u"
                               1008                 :                :                          "   AND p.pubname IN ( %s )",
                               1009                 :                :                          lrel->remoteid,
                               1010                 :                :                          pub_names->data);
                               1011                 :                : 
                               1012                 :            194 :         res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 1, qualRow);
                               1013                 :                : 
                               1014         [ -  + ]:            194 :         if (res->status != WALRCV_OK_TUPLES)
 1344 akapila@postgresql.o     1015         [ #  # ]:UBC           0 :             ereport(ERROR,
                               1016                 :                :                     (errmsg("could not fetch table WHERE clause info for table \"%s.%s\" from publisher: %s",
                               1017                 :                :                             nspname, relname, res->err)));
                               1018                 :                : 
                               1019                 :                :         /*
                               1020                 :                :          * Multiple row filter expressions for the same table will be combined
                               1021                 :                :          * by COPY using OR. If any of the filter expressions for this table
                               1022                 :                :          * are null, it means the whole table will be copied. In this case it
                               1023                 :                :          * is not necessary to construct a unified row filter expression at
                               1024                 :                :          * all.
                               1025                 :                :          */
 1344 akapila@postgresql.o     1026                 :CBC         194 :         slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
                               1027         [ +  + ]:            209 :         while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
                               1028                 :                :         {
                               1029                 :            198 :             Datum       rf = slot_getattr(slot, 1, &isnull);
                               1030                 :                : 
                               1031         [ +  + ]:            198 :             if (!isnull)
                               1032                 :             15 :                 *qual = lappend(*qual, makeString(TextDatumGetCString(rf)));
                               1033                 :                :             else
                               1034                 :                :             {
                               1035                 :                :                 /* Ignore filters and cleanup as necessary. */
                               1036         [ +  + ]:            183 :                 if (*qual)
                               1037                 :                :                 {
                               1038                 :              3 :                     list_free_deep(*qual);
                               1039                 :              3 :                     *qual = NIL;
                               1040                 :                :                 }
                               1041                 :            183 :                 break;
                               1042                 :                :             }
                               1043                 :                : 
                               1044                 :             15 :             ExecClearTuple(slot);
                               1045                 :                :         }
                               1046                 :            194 :         ExecDropSingleTupleTableSlot(slot);
                               1047                 :                : 
                               1048                 :            194 :         walrcv_clear_result(res);
  368 michael@paquier.xyz      1049                 :            194 :         destroyStringInfo(pub_names);
                               1050                 :                :     }
                               1051                 :                : 
 3141 peter_e@gmx.net          1052                 :            194 :     pfree(cmd.data);
                               1053                 :            194 : }
                               1054                 :                : 
                               1055                 :                : /*
                               1056                 :                :  * Copy existing data of a table from publisher.
                               1057                 :                :  *
                               1058                 :                :  * Caller is responsible for locking the local relation.
                               1059                 :                :  */
                               1060                 :                : static void
                               1061                 :            195 : copy_table(Relation rel)
                               1062                 :                : {
                               1063                 :                :     LogicalRepRelMapEntry *relmapentry;
                               1064                 :                :     LogicalRepRelation lrel;
 1344 akapila@postgresql.o     1065                 :            195 :     List       *qual = NIL;
                               1066                 :                :     WalRcvExecResult *res;
                               1067                 :                :     StringInfoData cmd;
                               1068                 :                :     CopyFromState cstate;
                               1069                 :                :     List       *attnamelist;
                               1070                 :                :     ParseState *pstate;
  950                          1071                 :            195 :     List       *options = NIL;
  363                          1072                 :            195 :     bool        gencol_published = false;
                               1073                 :                : 
                               1074                 :                :     /* Get the publisher relation info. */
 3141 peter_e@gmx.net          1075                 :            195 :     fetch_remote_table_info(get_namespace_name(RelationGetNamespace(rel)),
  363 akapila@postgresql.o     1076                 :            195 :                             RelationGetRelationName(rel), &lrel, &qual,
                               1077                 :                :                             &gencol_published);
                               1078                 :                : 
                               1079                 :                :     /* Put the relation into relmap. */
 3141 peter_e@gmx.net          1080                 :            194 :     logicalrep_relmap_update(&lrel);
                               1081                 :                : 
                               1082                 :                :     /* Map the publisher relation to local one. */
                               1083                 :            194 :     relmapentry = logicalrep_rel_open(lrel.remoteid, NoLock);
                               1084         [ -  + ]:            192 :     Assert(rel == relmapentry->localrel);
                               1085                 :                : 
                               1086                 :                :     /* Start copy on the publisher. */
                               1087                 :            192 :     initStringInfo(&cmd);
                               1088                 :                : 
                               1089                 :                :     /* Regular table with no row filter or generated columns */
  363 akapila@postgresql.o     1090   [ +  +  +  +  :            192 :     if (lrel.relkind == RELKIND_RELATION && qual == NIL && !gencol_published)
                                              +  + ]
                               1091                 :                :     {
  706                          1092                 :            164 :         appendStringInfo(&cmd, "COPY %s",
 2049 peter@eisentraut.org     1093                 :            164 :                          quote_qualified_identifier(lrel.nspname, lrel.relname));
                               1094                 :                : 
                               1095                 :                :         /* If the table has columns, then specify the columns */
  706 akapila@postgresql.o     1096         [ +  + ]:            164 :         if (lrel.natts)
                               1097                 :                :         {
                               1098                 :            163 :             appendStringInfoString(&cmd, " (");
                               1099                 :                : 
                               1100                 :                :             /*
                               1101                 :                :              * XXX Do we need to list the columns in all cases? Maybe we're
                               1102                 :                :              * replicating all columns?
                               1103                 :                :              */
                               1104         [ +  + ]:            445 :             for (int i = 0; i < lrel.natts; i++)
                               1105                 :                :             {
                               1106         [ +  + ]:            282 :                 if (i > 0)
                               1107                 :            119 :                     appendStringInfoString(&cmd, ", ");
                               1108                 :                : 
                               1109                 :            282 :                 appendStringInfoString(&cmd, quote_identifier(lrel.attnames[i]));
                               1110                 :                :             }
                               1111                 :                : 
  566 drowley@postgresql.o     1112                 :            163 :             appendStringInfoChar(&cmd, ')');
                               1113                 :                :         }
                               1114                 :                : 
  706 akapila@postgresql.o     1115                 :            164 :         appendStringInfoString(&cmd, " TO STDOUT");
                               1116                 :                :     }
                               1117                 :                :     else
                               1118                 :                :     {
                               1119                 :                :         /*
                               1120                 :                :          * For non-tables and tables with row filters, we need to do COPY
                               1121                 :                :          * (SELECT ...), but we can't just do SELECT * because we may need to
                               1122                 :                :          * copy only subset of columns including generated columns. For tables
                               1123                 :                :          * with any row filters, build a SELECT query with OR'ed row filters
                               1124                 :                :          * for COPY.
                               1125                 :                :          *
                               1126                 :                :          * We also need to use this same COPY (SELECT ...) syntax when
                               1127                 :                :          * generated columns are published, because copy of generated columns
                               1128                 :                :          * is not supported by the normal COPY.
                               1129                 :                :          */
 1839 drowley@postgresql.o     1130                 :             28 :         appendStringInfoString(&cmd, "COPY (SELECT ");
 2049 peter@eisentraut.org     1131         [ +  + ]:             70 :         for (int i = 0; i < lrel.natts; i++)
                               1132                 :                :         {
                               1133                 :             42 :             appendStringInfoString(&cmd, quote_identifier(lrel.attnames[i]));
                               1134         [ +  + ]:             42 :             if (i < lrel.natts - 1)
                               1135                 :             14 :                 appendStringInfoString(&cmd, ", ");
                               1136                 :                :         }
                               1137                 :                : 
 1344 akapila@postgresql.o     1138                 :             28 :         appendStringInfoString(&cmd, " FROM ");
                               1139                 :                : 
                               1140                 :                :         /*
                               1141                 :                :          * For regular tables, make sure we don't copy data from a child that
                               1142                 :                :          * inherits the named table as those will be copied separately.
                               1143                 :                :          */
                               1144         [ +  + ]:             28 :         if (lrel.relkind == RELKIND_RELATION)
                               1145                 :             11 :             appendStringInfoString(&cmd, "ONLY ");
                               1146                 :                : 
                               1147                 :             28 :         appendStringInfoString(&cmd, quote_qualified_identifier(lrel.nspname, lrel.relname));
                               1148                 :                :         /* list of OR'ed filters */
                               1149         [ +  + ]:             28 :         if (qual != NIL)
                               1150                 :                :         {
                               1151                 :                :             ListCell   *lc;
                               1152                 :             11 :             char       *q = strVal(linitial(qual));
                               1153                 :                : 
                               1154                 :             11 :             appendStringInfo(&cmd, " WHERE %s", q);
                               1155   [ +  -  +  +  :             12 :             for_each_from(lc, qual, 1)
                                              +  + ]
                               1156                 :                :             {
                               1157                 :              1 :                 q = strVal(lfirst(lc));
                               1158                 :              1 :                 appendStringInfo(&cmd, " OR %s", q);
                               1159                 :                :             }
                               1160                 :             11 :             list_free_deep(qual);
                               1161                 :                :         }
                               1162                 :                : 
                               1163                 :             28 :         appendStringInfoString(&cmd, ") TO STDOUT");
                               1164                 :                :     }
                               1165                 :                : 
                               1166                 :                :     /*
                               1167                 :                :      * Prior to v16, initial table synchronization will use text format even
                               1168                 :                :      * if the binary option is enabled for a subscription.
                               1169                 :                :      */
  950                          1170         [ +  - ]:            192 :     if (walrcv_server_version(LogRepWorkerWalRcvConn) >= 160000 &&
                               1171         [ +  + ]:            192 :         MySubscription->binary)
                               1172                 :                :     {
                               1173                 :              5 :         appendStringInfoString(&cmd, " WITH (FORMAT binary)");
                               1174                 :              5 :         options = list_make1(makeDefElem("format",
                               1175                 :                :                                          (Node *) makeString("binary"), -1));
                               1176                 :                :     }
                               1177                 :                : 
 1630 alvherre@alvh.no-ip.     1178                 :            192 :     res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 0, NULL);
 3141 peter_e@gmx.net          1179                 :            192 :     pfree(cmd.data);
                               1180         [ -  + ]:            192 :     if (res->status != WALRCV_OK_COPY_OUT)
 3141 peter_e@gmx.net          1181         [ #  # ]:UBC           0 :         ereport(ERROR,
                               1182                 :                :                 (errcode(ERRCODE_CONNECTION_FAILURE),
                               1183                 :                :                  errmsg("could not start initial contents copy for table \"%s.%s\": %s",
                               1184                 :                :                         lrel.nspname, lrel.relname, res->err)));
 3141 peter_e@gmx.net          1185                 :CBC         192 :     walrcv_clear_result(res);
                               1186                 :                : 
                               1187                 :            192 :     copybuf = makeStringInfo();
                               1188                 :                : 
 3116                          1189                 :            192 :     pstate = make_parsestate(NULL);
 2126 tgl@sss.pgh.pa.us        1190                 :            192 :     (void) addRangeTableEntryForRelation(pstate, rel, AccessShareLock,
                               1191                 :                :                                          NULL, false, false);
                               1192                 :                : 
 3141 peter_e@gmx.net          1193                 :            192 :     attnamelist = make_copy_attnamelist(relmapentry);
  950 akapila@postgresql.o     1194                 :            192 :     cstate = BeginCopyFrom(pstate, rel, NULL, NULL, false, copy_read_data, attnamelist, options);
                               1195                 :                : 
                               1196                 :                :     /* Do the copy */
 3141 peter_e@gmx.net          1197                 :            191 :     (void) CopyFrom(cstate);
                               1198                 :                : 
                               1199                 :            184 :     logicalrep_rel_close(relmapentry, NoLock);
                               1200                 :            184 : }
                               1201                 :                : 
                               1202                 :                : /*
                               1203                 :                :  * Determine the tablesync slot name.
                               1204                 :                :  *
                               1205                 :                :  * The name must not exceed NAMEDATALEN - 1 because of remote node constraints
                               1206                 :                :  * on slot name length. We append system_identifier to avoid slot_name
                               1207                 :                :  * collision with subscriptions in other clusters. With the current scheme
                               1208                 :                :  * pg_%u_sync_%u_UINT64_FORMAT (3 + 10 + 6 + 10 + 20 + '\0'), the maximum
                               1209                 :                :  * length of slot_name will be 50.
                               1210                 :                :  *
                               1211                 :                :  * The returned slot name is stored in the supplied buffer (syncslotname) with
                               1212                 :                :  * the given size.
                               1213                 :                :  *
                               1214                 :                :  * Note: We don't use the subscription slot name as part of tablesync slot name
                               1215                 :                :  * because we are responsible for cleaning up these slots and it could become
                               1216                 :                :  * impossible to recalculate what name to cleanup if the subscription slot name
                               1217                 :                :  * had changed.
                               1218                 :                :  */
                               1219                 :                : void
 1719 akapila@postgresql.o     1220                 :            384 : ReplicationSlotNameForTablesync(Oid suboid, Oid relid,
                               1221                 :                :                                 char *syncslotname, Size szslot)
                               1222                 :                : {
 1716                          1223                 :            384 :     snprintf(syncslotname, szslot, "pg_%u_sync_%u_" UINT64_FORMAT, suboid,
                               1224                 :                :              relid, GetSystemIdentifier());
 1719                          1225                 :            384 : }
                               1226                 :                : 
                               1227                 :                : /*
                               1228                 :                :  * Start syncing the table in the sync worker.
                               1229                 :                :  *
                               1230                 :                :  * If nothing needs to be done to sync the table, we exit the worker without
                               1231                 :                :  * any further action.
                               1232                 :                :  *
                               1233                 :                :  * The returned slot name is palloc'ed in current memory context.
                               1234                 :                :  */
                               1235                 :                : static char *
 3141 peter_e@gmx.net          1236                 :            195 : LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
                               1237                 :                : {
                               1238                 :                :     char       *slotname;
                               1239                 :                :     char       *err;
                               1240                 :                :     char        relstate;
                               1241                 :                :     XLogRecPtr  relstate_lsn;
                               1242                 :                :     Relation    rel;
                               1243                 :                :     AclResult   aclresult;
                               1244                 :                :     WalRcvExecResult *res;
                               1245                 :                :     char        originname[NAMEDATALEN];
                               1246                 :                :     RepOriginId originid;
                               1247                 :                :     UserContext ucxt;
                               1248                 :                :     bool        must_use_password;
                               1249                 :                :     bool        run_as_owner;
                               1250                 :                : 
                               1251                 :                :     /* Check the state of the table synchronization. */
                               1252                 :            195 :     StartTransactionCommand();
 3113 fujii@postgresql.org     1253                 :            195 :     relstate = GetSubscriptionRelState(MyLogicalRepWorker->subid,
                               1254                 :            195 :                                        MyLogicalRepWorker->relid,
                               1255                 :                :                                        &relstate_lsn);
  742 akapila@postgresql.o     1256                 :            195 :     CommitTransactionCommand();
                               1257                 :                : 
                               1258                 :                :     /* Is the use of a password mandatory? */
  866                          1259         [ +  + ]:            386 :     must_use_password = MySubscription->passwordrequired &&
  742                          1260         [ -  + ]:            191 :         !MySubscription->ownersuperuser;
                               1261                 :                : 
 3141 peter_e@gmx.net          1262         [ -  + ]:            195 :     SpinLockAcquire(&MyLogicalRepWorker->relmutex);
 3113 fujii@postgresql.org     1263                 :            195 :     MyLogicalRepWorker->relstate = relstate;
                               1264                 :            195 :     MyLogicalRepWorker->relstate_lsn = relstate_lsn;
 3141 peter_e@gmx.net          1265                 :            195 :     SpinLockRelease(&MyLogicalRepWorker->relmutex);
                               1266                 :                : 
                               1267                 :                :     /*
                               1268                 :                :      * If synchronization is already done or no longer necessary, exit now
                               1269                 :                :      * that we've updated shared memory state.
                               1270                 :                :      */
 1839 alvherre@alvh.no-ip.     1271         [ -  + ]:            195 :     switch (relstate)
                               1272                 :                :     {
 1839 alvherre@alvh.no-ip.     1273                 :UBC           0 :         case SUBREL_STATE_SYNCDONE:
                               1274                 :                :         case SUBREL_STATE_READY:
                               1275                 :                :         case SUBREL_STATE_UNKNOWN:
   12 akapila@postgresql.o     1276                 :UNC           0 :             FinishSyncWorker(); /* doesn't return */
                               1277                 :                :     }
                               1278                 :                : 
                               1279                 :                :     /* Calculate the name of the tablesync slot. */
 1716 akapila@postgresql.o     1280                 :CBC         195 :     slotname = (char *) palloc(NAMEDATALEN);
                               1281                 :            195 :     ReplicationSlotNameForTablesync(MySubscription->oid,
                               1282                 :            195 :                                     MyLogicalRepWorker->relid,
                               1283                 :                :                                     slotname,
                               1284                 :                :                                     NAMEDATALEN);
                               1285                 :                : 
                               1286                 :                :     /*
                               1287                 :                :      * Here we use the slot name instead of the subscription name as the
                               1288                 :                :      * application_name, so that it is different from the leader apply worker,
                               1289                 :                :      * so that synchronous replication can distinguish them.
                               1290                 :                :      */
 1630 alvherre@alvh.no-ip.     1291                 :            195 :     LogRepWorkerWalRcvConn =
  631 akapila@postgresql.o     1292                 :            195 :         walrcv_connect(MySubscription->conninfo, true, true,
                               1293                 :                :                        must_use_password,
                               1294                 :                :                        slotname, &err);
 1630 alvherre@alvh.no-ip.     1295         [ -  + ]:            195 :     if (LogRepWorkerWalRcvConn == NULL)
 3141 peter_e@gmx.net          1296         [ #  # ]:UBC           0 :         ereport(ERROR,
                               1297                 :                :                 (errcode(ERRCODE_CONNECTION_FAILURE),
                               1298                 :                :                  errmsg("table synchronization worker for subscription \"%s\" could not connect to the publisher: %s",
                               1299                 :                :                         MySubscription->name, err)));
                               1300                 :                : 
 1839 alvherre@alvh.no-ip.     1301   [ +  +  -  +  :CBC         195 :     Assert(MyLogicalRepWorker->relstate == SUBREL_STATE_INIT ||
                                              -  - ]
                               1302                 :                :            MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC ||
                               1303                 :                :            MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY);
                               1304                 :                : 
                               1305                 :                :     /* Assign the origin tracking record name. */
 1113 akapila@postgresql.o     1306                 :            195 :     ReplicationOriginNameForLogicalRep(MySubscription->oid,
                               1307                 :            195 :                                        MyLogicalRepWorker->relid,
                               1308                 :                :                                        originname,
                               1309                 :                :                                        sizeof(originname));
                               1310                 :                : 
 1719                          1311         [ +  + ]:            195 :     if (MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC)
                               1312                 :                :     {
                               1313                 :                :         /*
                               1314                 :                :          * We have previously errored out before finishing the copy so the
                               1315                 :                :          * replication slot might exist. We want to remove the slot if it
                               1316                 :                :          * already exists and proceed.
                               1317                 :                :          *
                               1318                 :                :          * XXX We could also instead try to drop the slot, last time we failed
                               1319                 :                :          * but for that, we might need to clean up the copy state as it might
                               1320                 :                :          * be in the middle of fetching the rows. Also, if there is a network
                               1321                 :                :          * breakdown then it wouldn't have succeeded so trying it next time
                               1322                 :                :          * seems like a better bet.
                               1323                 :                :          */
 1630 alvherre@alvh.no-ip.     1324                 :              7 :         ReplicationSlotDropAtPubNode(LogRepWorkerWalRcvConn, slotname, true);
                               1325                 :                :     }
 1719 akapila@postgresql.o     1326         [ -  + ]:            188 :     else if (MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY)
                               1327                 :                :     {
                               1328                 :                :         /*
                               1329                 :                :          * The COPY phase was previously done, but tablesync then crashed
                               1330                 :                :          * before it was able to finish normally.
                               1331                 :                :          */
 1719 akapila@postgresql.o     1332                 :UBC           0 :         StartTransactionCommand();
                               1333                 :                : 
                               1334                 :                :         /*
                               1335                 :                :          * The origin tracking name must already exist. It was created first
                               1336                 :                :          * time this tablesync was launched.
                               1337                 :                :          */
                               1338                 :              0 :         originid = replorigin_by_name(originname, false);
 1023                          1339                 :              0 :         replorigin_session_setup(originid, 0);
 1719                          1340                 :              0 :         replorigin_session_origin = originid;
                               1341                 :              0 :         *origin_startpos = replorigin_session_get_progress(false);
                               1342                 :                : 
                               1343                 :              0 :         CommitTransactionCommand();
                               1344                 :                : 
                               1345                 :              0 :         goto copy_table_done;
                               1346                 :                :     }
                               1347                 :                : 
 1839 alvherre@alvh.no-ip.     1348         [ -  + ]:CBC         195 :     SpinLockAcquire(&MyLogicalRepWorker->relmutex);
                               1349                 :            195 :     MyLogicalRepWorker->relstate = SUBREL_STATE_DATASYNC;
                               1350                 :            195 :     MyLogicalRepWorker->relstate_lsn = InvalidXLogRecPtr;
                               1351                 :            195 :     SpinLockRelease(&MyLogicalRepWorker->relmutex);
                               1352                 :                : 
                               1353                 :                :     /* Update the state and make it visible to others. */
                               1354                 :            195 :     StartTransactionCommand();
                               1355                 :            195 :     UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
                               1356                 :            195 :                                MyLogicalRepWorker->relid,
                               1357                 :            195 :                                MyLogicalRepWorker->relstate,
   88 akapila@postgresql.o     1358                 :            195 :                                MyLogicalRepWorker->relstate_lsn,
                               1359                 :                :                                false);
 1839 alvherre@alvh.no-ip.     1360                 :            195 :     CommitTransactionCommand();
 1301 andres@anarazel.de       1361                 :            195 :     pgstat_report_stat(true);
                               1362                 :                : 
 1839 alvherre@alvh.no-ip.     1363                 :            195 :     StartTransactionCommand();
                               1364                 :                : 
                               1365                 :                :     /*
                               1366                 :                :      * Use a standard write lock here. It might be better to disallow access
                               1367                 :                :      * to the table while it's being synchronized. But we don't want to block
                               1368                 :                :      * the main apply process from working and it has to open the relation in
                               1369                 :                :      * RowExclusiveLock when remapping remote relation id to local one.
                               1370                 :                :      */
                               1371                 :            195 :     rel = table_open(MyLogicalRepWorker->relid, RowExclusiveLock);
                               1372                 :                : 
                               1373                 :                :     /*
                               1374                 :                :      * Start a transaction in the remote node in REPEATABLE READ mode.  This
                               1375                 :                :      * ensures that both the replication slot we create (see below) and the
                               1376                 :                :      * COPY are consistent with each other.
                               1377                 :                :      */
 1630                          1378                 :            195 :     res = walrcv_exec(LogRepWorkerWalRcvConn,
                               1379                 :                :                       "BEGIN READ ONLY ISOLATION LEVEL REPEATABLE READ",
                               1380                 :                :                       0, NULL);
 1839                          1381         [ -  + ]:            195 :     if (res->status != WALRCV_OK_COMMAND)
 1839 alvherre@alvh.no-ip.     1382         [ #  # ]:UBC           0 :         ereport(ERROR,
                               1383                 :                :                 (errcode(ERRCODE_CONNECTION_FAILURE),
                               1384                 :                :                  errmsg("table copy could not start transaction on publisher: %s",
                               1385                 :                :                         res->err)));
 1839 alvherre@alvh.no-ip.     1386                 :CBC         195 :     walrcv_clear_result(res);
                               1387                 :                : 
                               1388                 :                :     /*
                               1389                 :                :      * Create a new permanent logical decoding slot. This slot will be used
                               1390                 :                :      * for the catchup phase after COPY is done, so tell it to use the
                               1391                 :                :      * snapshot to make the final data consistent.
                               1392                 :                :      */
 1567 akapila@postgresql.o     1393                 :            195 :     walrcv_create_slot(LogRepWorkerWalRcvConn,
                               1394                 :                :                        slotname, false /* permanent */ , false /* two_phase */ ,
                               1395                 :                :                        MySubscription->failover,
                               1396                 :                :                        CRS_USE_SNAPSHOT, origin_startpos);
                               1397                 :                : 
                               1398                 :                :     /*
                               1399                 :                :      * Setup replication origin tracking. The purpose of doing this before the
                               1400                 :                :      * copy is to avoid doing the copy again due to any error in setting up
                               1401                 :                :      * origin tracking.
                               1402                 :                :      */
 1719                          1403                 :            195 :     originid = replorigin_by_name(originname, true);
                               1404         [ +  - ]:            195 :     if (!OidIsValid(originid))
                               1405                 :                :     {
                               1406                 :                :         /*
                               1407                 :                :          * Origin tracking does not exist, so create it now.
                               1408                 :                :          *
                               1409                 :                :          * Then advance to the LSN got from walrcv_create_slot. This is WAL
                               1410                 :                :          * logged for the purpose of recovery. Locks are to prevent the
                               1411                 :                :          * replication origin from vanishing while advancing.
                               1412                 :                :          */
                               1413                 :            195 :         originid = replorigin_create(originname);
                               1414                 :                : 
                               1415                 :            195 :         LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
                               1416                 :            195 :         replorigin_advance(originid, *origin_startpos, InvalidXLogRecPtr,
                               1417                 :                :                            true /* go backward */ , true /* WAL log */ );
                               1418                 :            195 :         UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
                               1419                 :                : 
 1023                          1420                 :            195 :         replorigin_session_setup(originid, 0);
 1719                          1421                 :            195 :         replorigin_session_origin = originid;
                               1422                 :                :     }
                               1423                 :                :     else
                               1424                 :                :     {
 1719 akapila@postgresql.o     1425         [ #  # ]:UBC           0 :         ereport(ERROR,
                               1426                 :                :                 (errcode(ERRCODE_DUPLICATE_OBJECT),
                               1427                 :                :                  errmsg("replication origin \"%s\" already exists",
                               1428                 :                :                         originname)));
                               1429                 :                :     }
                               1430                 :                : 
                               1431                 :                :     /*
                               1432                 :                :      * Make sure that the copy command runs as the table owner, unless the
                               1433                 :                :      * user has opted out of that behaviour.
                               1434                 :                :      */
  872 msawada@postgresql.o     1435                 :CBC         195 :     run_as_owner = MySubscription->runasowner;
                               1436         [ +  + ]:            195 :     if (!run_as_owner)
                               1437                 :            194 :         SwitchToUntrustedUser(rel->rd_rel->relowner, &ucxt);
                               1438                 :                : 
                               1439                 :                :     /*
                               1440                 :                :      * Check that our table sync worker has permission to insert into the
                               1441                 :                :      * target table.
                               1442                 :                :      */
                               1443                 :            195 :     aclresult = pg_class_aclcheck(RelationGetRelid(rel), GetUserId(),
                               1444                 :                :                                   ACL_INSERT);
                               1445         [ -  + ]:            195 :     if (aclresult != ACLCHECK_OK)
  872 msawada@postgresql.o     1446                 :UBC           0 :         aclcheck_error(aclresult,
                               1447                 :              0 :                        get_relkind_objtype(rel->rd_rel->relkind),
                               1448                 :              0 :                        RelationGetRelationName(rel));
                               1449                 :                : 
                               1450                 :                :     /*
                               1451                 :                :      * COPY FROM does not honor RLS policies.  That is not a problem for
                               1452                 :                :      * subscriptions owned by roles with BYPASSRLS privilege (or superuser,
                               1453                 :                :      * who has it implicitly), but other roles should not be able to
                               1454                 :                :      * circumvent RLS.  Disallow logical replication into RLS enabled
                               1455                 :                :      * relations for such roles.
                               1456                 :                :      */
  872 msawada@postgresql.o     1457         [ -  + ]:CBC         195 :     if (check_enable_rls(RelationGetRelid(rel), InvalidOid, false) == RLS_ENABLED)
  872 msawada@postgresql.o     1458         [ #  # ]:UBC           0 :         ereport(ERROR,
                               1459                 :                :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                               1460                 :                :                  errmsg("user \"%s\" cannot replicate into relation with row-level security enabled: \"%s\"",
                               1461                 :                :                         GetUserNameFromId(GetUserId(), true),
                               1462                 :                :                         RelationGetRelationName(rel))));
                               1463                 :                : 
                               1464                 :                :     /* Now do the initial data copy */
 1300 tomas.vondra@postgre     1465                 :CBC         195 :     PushActiveSnapshot(GetTransactionSnapshot());
                               1466                 :            195 :     copy_table(rel);
                               1467                 :            184 :     PopActiveSnapshot();
                               1468                 :                : 
 1630 alvherre@alvh.no-ip.     1469                 :            184 :     res = walrcv_exec(LogRepWorkerWalRcvConn, "COMMIT", 0, NULL);
 1839                          1470         [ -  + ]:            184 :     if (res->status != WALRCV_OK_COMMAND)
 1839 alvherre@alvh.no-ip.     1471         [ #  # ]:UBC           0 :         ereport(ERROR,
                               1472                 :                :                 (errcode(ERRCODE_CONNECTION_FAILURE),
                               1473                 :                :                  errmsg("table copy could not finish transaction on publisher: %s",
                               1474                 :                :                         res->err)));
 1839 alvherre@alvh.no-ip.     1475                 :CBC         184 :     walrcv_clear_result(res);
                               1476                 :                : 
  861 tgl@sss.pgh.pa.us        1477         [ +  + ]:            184 :     if (!run_as_owner)
  872 msawada@postgresql.o     1478                 :            183 :         RestoreUserContext(&ucxt);
                               1479                 :                : 
 1839 alvherre@alvh.no-ip.     1480                 :            184 :     table_close(rel, NoLock);
                               1481                 :                : 
                               1482                 :                :     /* Make the copy visible. */
                               1483                 :            184 :     CommandCounterIncrement();
                               1484                 :                : 
                               1485                 :                :     /*
                               1486                 :                :      * Update the persisted state to indicate the COPY phase is done; make it
                               1487                 :                :      * visible to others.
                               1488                 :                :      */
 1719 akapila@postgresql.o     1489                 :            184 :     UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
                               1490                 :            184 :                                MyLogicalRepWorker->relid,
                               1491                 :                :                                SUBREL_STATE_FINISHEDCOPY,
   88                          1492                 :            184 :                                MyLogicalRepWorker->relstate_lsn,
                               1493                 :                :                                false);
                               1494                 :                : 
 1719                          1495                 :            184 :     CommitTransactionCommand();
                               1496                 :                : 
                               1497                 :            184 : copy_table_done:
                               1498                 :                : 
                               1499         [ -  + ]:            184 :     elog(DEBUG1,
                               1500                 :                :          "LogicalRepSyncTableStart: '%s' origin_startpos lsn %X/%08X",
                               1501                 :                :          originname, LSN_FORMAT_ARGS(*origin_startpos));
                               1502                 :                : 
                               1503                 :                :     /*
                               1504                 :                :      * We are done with the initial data synchronization, update the state.
                               1505                 :                :      */
 1839 alvherre@alvh.no-ip.     1506         [ -  + ]:            184 :     SpinLockAcquire(&MyLogicalRepWorker->relmutex);
                               1507                 :            184 :     MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCWAIT;
                               1508                 :            184 :     MyLogicalRepWorker->relstate_lsn = *origin_startpos;
                               1509                 :            184 :     SpinLockRelease(&MyLogicalRepWorker->relmutex);
                               1510                 :                : 
                               1511                 :                :     /*
                               1512                 :                :      * Finally, wait until the leader apply worker tells us to catch up and
                               1513                 :                :      * then return to let LogicalRepApplyLoop do it.
                               1514                 :                :      */
                               1515                 :            184 :     wait_for_worker_state_change(SUBREL_STATE_CATCHUP);
 3141 peter_e@gmx.net          1516                 :            184 :     return slotname;
                               1517                 :                : }
                               1518                 :                : 
                               1519                 :                : /*
                               1520                 :                :  * Execute the initial sync with error handling. Disable the subscription,
                               1521                 :                :  * if it's required.
                               1522                 :                :  *
                               1523                 :                :  * Allocate the slot name in long-lived context on return. Note that we don't
                               1524                 :                :  * handle FATAL errors which are probably because of system resource error and
                               1525                 :                :  * are not repeatable.
                               1526                 :                :  */
                               1527                 :                : static void
  817 akapila@postgresql.o     1528                 :            195 : start_table_sync(XLogRecPtr *origin_startpos, char **slotname)
                               1529                 :                : {
                               1530                 :            195 :     char       *sync_slotname = NULL;
                               1531                 :                : 
                               1532         [ -  + ]:            195 :     Assert(am_tablesync_worker());
                               1533                 :                : 
                               1534         [ +  + ]:            195 :     PG_TRY();
                               1535                 :                :     {
                               1536                 :                :         /* Call initial sync. */
                               1537                 :            195 :         sync_slotname = LogicalRepSyncTableStart(origin_startpos);
                               1538                 :                :     }
                               1539                 :             11 :     PG_CATCH();
                               1540                 :                :     {
                               1541         [ +  + ]:             11 :         if (MySubscription->disableonerr)
                               1542                 :              1 :             DisableSubscriptionAndExit();
                               1543                 :                :         else
                               1544                 :                :         {
                               1545                 :                :             /*
                               1546                 :                :              * Report the worker failed during table synchronization. Abort
                               1547                 :                :              * the current transaction so that the stats message is sent in an
                               1548                 :                :              * idle state.
                               1549                 :                :              */
                               1550                 :             10 :             AbortOutOfAnyTransaction();
                               1551                 :             10 :             pgstat_report_subscription_error(MySubscription->oid, false);
                               1552                 :                : 
                               1553                 :             10 :             PG_RE_THROW();
                               1554                 :                :         }
                               1555                 :                :     }
                               1556         [ -  + ]:            184 :     PG_END_TRY();
                               1557                 :                : 
                               1558                 :                :     /* allocate slot name in long-lived context */
                               1559                 :            184 :     *slotname = MemoryContextStrdup(ApplyContext, sync_slotname);
                               1560                 :            184 :     pfree(sync_slotname);
                               1561                 :            184 : }
                               1562                 :                : 
                               1563                 :                : /*
                               1564                 :                :  * Runs the tablesync worker.
                               1565                 :                :  *
                               1566                 :                :  * It starts syncing tables. After a successful sync, sets streaming options
                               1567                 :                :  * and starts streaming to catchup with apply worker.
                               1568                 :                :  */
                               1569                 :                : static void
                               1570                 :            195 : run_tablesync_worker()
                               1571                 :                : {
                               1572                 :                :     char        originname[NAMEDATALEN];
                               1573                 :            195 :     XLogRecPtr  origin_startpos = InvalidXLogRecPtr;
                               1574                 :            195 :     char       *slotname = NULL;
                               1575                 :                :     WalRcvStreamOptions options;
                               1576                 :                : 
                               1577                 :            195 :     start_table_sync(&origin_startpos, &slotname);
                               1578                 :                : 
                               1579                 :            184 :     ReplicationOriginNameForLogicalRep(MySubscription->oid,
                               1580                 :            184 :                                        MyLogicalRepWorker->relid,
                               1581                 :                :                                        originname,
                               1582                 :                :                                        sizeof(originname));
                               1583                 :                : 
                               1584                 :            184 :     set_apply_error_context_origin(originname);
                               1585                 :                : 
                               1586                 :            184 :     set_stream_options(&options, slotname, &origin_startpos);
                               1587                 :                : 
                               1588                 :            184 :     walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
                               1589                 :                : 
                               1590                 :                :     /* Apply the changes till we catchup with the apply worker. */
                               1591                 :            184 :     start_apply(origin_startpos);
  817 akapila@postgresql.o     1592                 :UBC           0 : }
                               1593                 :                : 
                               1594                 :                : /* Logical Replication Tablesync worker entry point */
                               1595                 :                : void
  817 akapila@postgresql.o     1596                 :CBC         198 : TablesyncWorkerMain(Datum main_arg)
                               1597                 :                : {
                               1598                 :            198 :     int         worker_slot = DatumGetInt32(main_arg);
                               1599                 :                : 
                               1600                 :            198 :     SetupApplyOrSyncWorker(worker_slot);
                               1601                 :                : 
                               1602                 :            195 :     run_tablesync_worker();
                               1603                 :                : 
   12 akapila@postgresql.o     1604                 :UNC           0 :     FinishSyncWorker();
                               1605                 :                : }
                               1606                 :                : 
                               1607                 :                : /*
                               1608                 :                :  * If the subscription has no tables then return false.
                               1609                 :                :  *
                               1610                 :                :  * Otherwise, are all tablesyncs READY?
                               1611                 :                :  *
                               1612                 :                :  * Note: This function is not suitable to be called from outside of apply or
                               1613                 :                :  * tablesync workers because MySubscription needs to be already initialized.
                               1614                 :                :  */
                               1615                 :                : bool
 1567 akapila@postgresql.o     1616                 :CBC         165 : AllTablesyncsReady(void)
                               1617                 :                : {
                               1618                 :            165 :     bool        started_tx = false;
                               1619                 :            165 :     bool        has_subrels = false;
                               1620                 :                : 
                               1621                 :                :     /* We need up-to-date sync state info for subscription tables here. */
   12 akapila@postgresql.o     1622                 :GNC         165 :     has_subrels = FetchRelationStates(&started_tx);
                               1623                 :                : 
 1567 akapila@postgresql.o     1624         [ +  + ]:CBC         165 :     if (started_tx)
                               1625                 :                :     {
                               1626                 :             15 :         CommitTransactionCommand();
 1301 andres@anarazel.de       1627                 :             15 :         pgstat_report_stat(true);
                               1628                 :                :     }
                               1629                 :                : 
                               1630                 :                :     /*
                               1631                 :                :      * Return false when there are no tables in subscription or not all tables
                               1632                 :                :      * are in ready state; true otherwise.
                               1633                 :                :      */
 1168 tgl@sss.pgh.pa.us        1634   [ +  -  +  + ]:            165 :     return has_subrels && (table_states_not_ready == NIL);
                               1635                 :                : }
                               1636                 :                : 
                               1637                 :                : /*
                               1638                 :                :  * Return whether the subscription currently has any tables.
                               1639                 :                :  *
                               1640                 :                :  * Note: Unlike HasSubscriptionTables(), this function relies on cached
                               1641                 :                :  * information for subscription tables. Additionally, it should not be
                               1642                 :                :  * invoked outside of apply or tablesync workers, as MySubscription must be
                               1643                 :                :  * initialized first.
                               1644                 :                :  */
                               1645                 :                : bool
   12 akapila@postgresql.o     1646                 :GNC         100 : HasSubscriptionTablesCached(void)
                               1647                 :                : {
                               1648                 :                :     bool        started_tx;
                               1649                 :                :     bool        has_subrels;
                               1650                 :                : 
                               1651                 :                :     /* We need up-to-date subscription tables info here */
                               1652                 :            100 :     has_subrels = FetchRelationStates(&started_tx);
                               1653                 :                : 
   50                          1654         [ +  + ]:            100 :     if (started_tx)
                               1655                 :                :     {
                               1656                 :              2 :         CommitTransactionCommand();
                               1657                 :              2 :         pgstat_report_stat(true);
                               1658                 :                :     }
                               1659                 :                : 
                               1660                 :            100 :     return has_subrels;
                               1661                 :                : }
                               1662                 :                : 
                               1663                 :                : /*
                               1664                 :                :  * Update the two_phase state of the specified subscription in pg_subscription.
                               1665                 :                :  */
                               1666                 :                : void
 1567 akapila@postgresql.o     1667                 :CBC          10 : UpdateTwoPhaseState(Oid suboid, char new_state)
                               1668                 :                : {
                               1669                 :                :     Relation    rel;
                               1670                 :                :     HeapTuple   tup;
                               1671                 :                :     bool        nulls[Natts_pg_subscription];
                               1672                 :                :     bool        replaces[Natts_pg_subscription];
                               1673                 :                :     Datum       values[Natts_pg_subscription];
                               1674                 :                : 
                               1675   [ +  -  +  -  :             10 :     Assert(new_state == LOGICALREP_TWOPHASE_STATE_DISABLED ||
                                              -  + ]
                               1676                 :                :            new_state == LOGICALREP_TWOPHASE_STATE_PENDING ||
                               1677                 :                :            new_state == LOGICALREP_TWOPHASE_STATE_ENABLED);
                               1678                 :                : 
                               1679                 :             10 :     rel = table_open(SubscriptionRelationId, RowExclusiveLock);
                               1680                 :             10 :     tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(suboid));
                               1681         [ -  + ]:             10 :     if (!HeapTupleIsValid(tup))
 1567 akapila@postgresql.o     1682         [ #  # ]:UBC           0 :         elog(ERROR,
                               1683                 :                :              "cache lookup failed for subscription oid %u",
                               1684                 :                :              suboid);
                               1685                 :                : 
                               1686                 :                :     /* Form a new tuple. */
 1567 akapila@postgresql.o     1687                 :CBC          10 :     memset(values, 0, sizeof(values));
                               1688                 :             10 :     memset(nulls, false, sizeof(nulls));
                               1689                 :             10 :     memset(replaces, false, sizeof(replaces));
                               1690                 :                : 
                               1691                 :                :     /* And update/set two_phase state */
                               1692                 :             10 :     values[Anum_pg_subscription_subtwophasestate - 1] = CharGetDatum(new_state);
                               1693                 :             10 :     replaces[Anum_pg_subscription_subtwophasestate - 1] = true;
                               1694                 :                : 
                               1695                 :             10 :     tup = heap_modify_tuple(tup, RelationGetDescr(rel),
                               1696                 :                :                             values, nulls, replaces);
                               1697                 :             10 :     CatalogTupleUpdate(rel, &tup->t_self, tup);
                               1698                 :                : 
                               1699                 :             10 :     heap_freetuple(tup);
                               1700                 :             10 :     table_close(rel, RowExclusiveLock);
                               1701                 :             10 : }
        

Generated by: LCOV version 2.4-beta