Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : * tablesync.c
3 : : * PostgreSQL logical replication: initial table data synchronization
4 : : *
5 : : * Copyright (c) 2012-2025, PostgreSQL Global Development Group
6 : : *
7 : : * IDENTIFICATION
8 : : * src/backend/replication/logical/tablesync.c
9 : : *
10 : : * NOTES
11 : : * This file contains code for initial table data synchronization for
12 : : * logical replication.
13 : : *
14 : : * The initial data synchronization is done separately for each table,
15 : : * in a separate apply worker that only fetches the initial snapshot data
16 : : * from the publisher and then synchronizes the position in the stream with
17 : : * the leader apply worker.
18 : : *
19 : : * There are several reasons for doing the synchronization this way:
20 : : * - It allows us to parallelize the initial data synchronization
21 : : * which lowers the time needed for it to happen.
22 : : * - The initial synchronization does not have to hold the xid and LSN
23 : : * for the time it takes to copy data of all tables, causing less
24 : : * bloat and lower disk consumption compared to doing the
25 : : * synchronization in a single process for the whole database.
26 : : * - It allows us to synchronize any tables added after the initial
27 : : * synchronization has finished.
28 : : *
29 : : * The stream position synchronization works in multiple steps:
30 : : * - Apply worker requests a tablesync worker to start, setting the new
31 : : * table state to INIT.
32 : : * - Tablesync worker starts; changes table state from INIT to DATASYNC while
33 : : * copying.
34 : : * - Tablesync worker does initial table copy; there is a FINISHEDCOPY (sync
35 : : * worker specific) state to indicate when the copy phase has completed, so
36 : : * if the worker crashes with this (non-memory) state then the copy will not
37 : : * be re-attempted.
38 : : * - Tablesync worker then sets table state to SYNCWAIT; waits for state change.
39 : : * - Apply worker periodically checks for tables in SYNCWAIT state. When
40 : : * any appear, it sets the table state to CATCHUP and starts loop-waiting
41 : : * until either the table state is set to SYNCDONE or the sync worker
42 : : * exits.
43 : : * - After the sync worker has seen the state change to CATCHUP, it will
44 : : * read the stream and apply changes (acting like an apply worker) until
45 : : * it catches up to the specified stream position. Then it sets the
46 : : * state to SYNCDONE. There might be zero changes applied between
47 : : * CATCHUP and SYNCDONE, because the sync worker might be ahead of the
48 : : * apply worker.
49 : : * - Once the state is set to SYNCDONE, the apply will continue tracking
50 : : * the table until it reaches the SYNCDONE stream position, at which
51 : : * point it sets state to READY and stops tracking. Again, there might
52 : : * be zero changes in between.
53 : : *
54 : : * So the state progression is always: INIT -> DATASYNC -> FINISHEDCOPY
55 : : * -> SYNCWAIT -> CATCHUP -> SYNCDONE -> READY.
56 : : *
57 : : * The catalog pg_subscription_rel is used to keep information about
58 : : * subscribed tables and their state. The catalog holds all states
59 : : * except SYNCWAIT and CATCHUP which are only in shared memory.
60 : : *
61 : : * Example flows look like this:
62 : : * - Apply is in front:
63 : : * sync:8
64 : : * -> set in catalog FINISHEDCOPY
65 : : * -> set in memory SYNCWAIT
66 : : * apply:10
67 : : * -> set in memory CATCHUP
68 : : * -> enter wait-loop
69 : : * sync:10
70 : : * -> set in catalog SYNCDONE
71 : : * -> exit
72 : : * apply:10
73 : : * -> exit wait-loop
74 : : * -> continue rep
75 : : * apply:11
76 : : * -> set in catalog READY
77 : : *
78 : : * - Sync is in front:
79 : : * sync:10
80 : : * -> set in catalog FINISHEDCOPY
81 : : * -> set in memory SYNCWAIT
82 : : * apply:8
83 : : * -> set in memory CATCHUP
84 : : * -> continue per-table filtering
85 : : * sync:10
86 : : * -> set in catalog SYNCDONE
87 : : * -> exit
88 : : * apply:10
89 : : * -> set in catalog READY
90 : : * -> stop per-table filtering
91 : : * -> continue rep
92 : : *-------------------------------------------------------------------------
93 : : */
94 : :
95 : : #include "postgres.h"
96 : :
97 : : #include "access/table.h"
98 : : #include "access/xact.h"
99 : : #include "catalog/indexing.h"
100 : : #include "catalog/pg_subscription_rel.h"
101 : : #include "catalog/pg_type.h"
102 : : #include "commands/copy.h"
103 : : #include "miscadmin.h"
104 : : #include "nodes/makefuncs.h"
105 : : #include "parser/parse_relation.h"
106 : : #include "pgstat.h"
107 : : #include "replication/logicallauncher.h"
108 : : #include "replication/logicalrelation.h"
109 : : #include "replication/logicalworker.h"
110 : : #include "replication/origin.h"
111 : : #include "replication/slot.h"
112 : : #include "replication/walreceiver.h"
113 : : #include "replication/worker_internal.h"
114 : : #include "storage/ipc.h"
115 : : #include "storage/lmgr.h"
116 : : #include "utils/acl.h"
117 : : #include "utils/array.h"
118 : : #include "utils/builtins.h"
119 : : #include "utils/lsyscache.h"
120 : : #include "utils/memutils.h"
121 : : #include "utils/rls.h"
122 : : #include "utils/snapmgr.h"
123 : : #include "utils/syscache.h"
124 : : #include "utils/usercontext.h"
125 : :
126 : : typedef enum
127 : : {
128 : : SYNC_TABLE_STATE_NEEDS_REBUILD,
129 : : SYNC_TABLE_STATE_REBUILD_STARTED,
130 : : SYNC_TABLE_STATE_VALID,
131 : : } SyncingTablesState;
132 : :
133 : : static SyncingTablesState table_states_validity = SYNC_TABLE_STATE_NEEDS_REBUILD;
134 : : static List *table_states_not_ready = NIL;
135 : : static bool FetchTableStates(bool *started_tx);
136 : :
137 : : static StringInfo copybuf = NULL;
138 : :
139 : : /*
140 : : * Exit routine for synchronization worker.
141 : : */
142 : : pg_noreturn static void
3089 peter_e@gmx.net 143 :CBC 183 : finish_sync_worker(void)
144 : : {
145 : : /*
146 : : * Commit any outstanding transaction. This is the usual case, unless
147 : : * there was nothing to do for the table.
148 : : */
149 [ + - ]: 183 : if (IsTransactionState())
150 : : {
151 : 183 : CommitTransactionCommand();
1249 andres@anarazel.de 152 : 183 : pgstat_report_stat(true);
153 : : }
154 : :
155 : : /* And flush all writes. */
3089 peter_e@gmx.net 156 : 183 : XLogFlush(GetXLogWriteRecPtr());
157 : :
3027 158 : 183 : StartTransactionCommand();
3089 159 [ + - ]: 183 : ereport(LOG,
160 : : (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished",
161 : : MySubscription->name,
162 : : get_rel_name(MyLogicalRepWorker->relid))));
3027 163 : 183 : CommitTransactionCommand();
164 : :
165 : : /* Find the leader apply worker and signal it. */
3014 166 : 183 : logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid);
167 : :
168 : : /* Stop gracefully */
3089 169 : 183 : proc_exit(0);
170 : : }
171 : :
172 : : /*
173 : : * Wait until the relation sync state is set in the catalog to the expected
174 : : * one; return true when it happens.
175 : : *
176 : : * Returns false if the table sync worker or the table itself have
177 : : * disappeared, or the table state has been reset.
178 : : *
179 : : * Currently, this is used in the apply worker when transitioning from
180 : : * CATCHUP state to SYNCDONE.
181 : : */
182 : : static bool
3014 183 : 181 : wait_for_relation_state_change(Oid relid, char expected_state)
184 : : {
185 : : char state;
186 : :
187 : : for (;;)
3089 188 : 203 : {
189 : : LogicalRepWorker *worker;
190 : : XLogRecPtr statelsn;
191 : :
3018 192 [ - + ]: 384 : CHECK_FOR_INTERRUPTS();
193 : :
1787 alvherre@alvh.no-ip. 194 : 384 : InvalidateCatalogSnapshot();
3014 peter_e@gmx.net 195 : 384 : state = GetSubscriptionRelState(MyLogicalRepWorker->subid,
196 : : relid, &statelsn);
197 : :
198 [ - + ]: 384 : if (state == SUBREL_STATE_UNKNOWN)
1787 alvherre@alvh.no-ip. 199 :UBC 0 : break;
200 : :
3014 peter_e@gmx.net 201 [ + + ]:CBC 384 : if (state == expected_state)
202 : 181 : return true;
203 : :
204 : : /* Check if the sync worker is still running and bail if not. */
3089 205 : 203 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
1787 alvherre@alvh.no-ip. 206 : 203 : worker = logicalrep_worker_find(MyLogicalRepWorker->subid, relid,
207 : : false);
3014 peter_e@gmx.net 208 : 203 : LWLockRelease(LogicalRepWorkerLock);
3089 209 [ - + ]: 203 : if (!worker)
1787 alvherre@alvh.no-ip. 210 :UBC 0 : break;
211 : :
2479 tmunro@postgresql.or 212 :CBC 203 : (void) WaitLatch(MyLatch,
213 : : WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
214 : : 1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
215 : :
3014 andres@anarazel.de 216 : 203 : ResetLatch(MyLatch);
217 : : }
218 : :
3014 peter_e@gmx.net 219 :UBC 0 : return false;
220 : : }
221 : :
222 : : /*
223 : : * Wait until the apply worker changes the state of our synchronization
224 : : * worker to the expected one.
225 : : *
226 : : * Used when transitioning from SYNCWAIT state to CATCHUP.
227 : : *
228 : : * Returns false if the apply worker has disappeared.
229 : : */
230 : : static bool
3014 peter_e@gmx.net 231 :CBC 183 : wait_for_worker_state_change(char expected_state)
232 : : {
233 : : int rc;
234 : :
235 : : for (;;)
236 : 184 : {
237 : : LogicalRepWorker *worker;
238 : :
239 [ - + ]: 367 : CHECK_FOR_INTERRUPTS();
240 : :
241 : : /*
242 : : * Done if already in correct state. (We assume this fetch is atomic
243 : : * enough to not give a misleading answer if we do it with no lock.)
244 : : */
2990 tgl@sss.pgh.pa.us 245 [ + + ]: 367 : if (MyLogicalRepWorker->relstate == expected_state)
246 : 183 : return true;
247 : :
248 : : /*
249 : : * Bail out if the apply worker has died, else signal it we're
250 : : * waiting.
251 : : */
3014 peter_e@gmx.net 252 : 184 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
253 : 184 : worker = logicalrep_worker_find(MyLogicalRepWorker->subid,
254 : : InvalidOid, false);
2990 tgl@sss.pgh.pa.us 255 [ + - + - ]: 184 : if (worker && worker->proc)
256 : 184 : logicalrep_worker_wakeup_ptr(worker);
3014 peter_e@gmx.net 257 : 184 : LWLockRelease(LogicalRepWorkerLock);
258 [ - + ]: 184 : if (!worker)
2990 tgl@sss.pgh.pa.us 259 :UBC 0 : break;
260 : :
261 : : /*
262 : : * Wait. We expect to get a latch signal back from the apply worker,
263 : : * but use a timeout in case it dies without sending one.
264 : : */
3014 andres@anarazel.de 265 :CBC 184 : rc = WaitLatch(MyLatch,
266 : : WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
267 : : 1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
268 : :
2990 tgl@sss.pgh.pa.us 269 [ + - ]: 184 : if (rc & WL_LATCH_SET)
270 : 184 : ResetLatch(MyLatch);
271 : : }
272 : :
3089 peter_e@gmx.net 273 :UBC 0 : return false;
274 : : }
275 : :
276 : : /*
277 : : * Callback from syscache invalidation.
278 : : */
279 : : void
3089 peter_e@gmx.net 280 :CBC 1715 : invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
281 : : {
499 akapila@postgresql.o 282 : 1715 : table_states_validity = SYNC_TABLE_STATE_NEEDS_REBUILD;
3089 peter_e@gmx.net 283 : 1715 : }
284 : :
285 : : /*
286 : : * Handle table synchronization cooperation from the synchronization
287 : : * worker.
288 : : *
289 : : * If the sync worker is in CATCHUP state and reached (or passed) the
290 : : * predetermined synchronization point in the WAL stream, mark the table as
291 : : * SYNCDONE and finish.
292 : : */
293 : : static void
294 : 200 : process_syncing_tables_for_sync(XLogRecPtr current_lsn)
295 : : {
296 [ - + ]: 200 : SpinLockAcquire(&MyLogicalRepWorker->relmutex);
297 : :
298 [ + - ]: 200 : if (MyLogicalRepWorker->relstate == SUBREL_STATE_CATCHUP &&
299 [ + + ]: 200 : current_lsn >= MyLogicalRepWorker->relstate_lsn)
300 : : {
301 : : TimeLineID tli;
1667 akapila@postgresql.o 302 : 183 : char syncslotname[NAMEDATALEN] = {0};
1103 303 : 183 : char originname[NAMEDATALEN] = {0};
304 : :
3014 peter_e@gmx.net 305 : 183 : MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCDONE;
3089 306 : 183 : MyLogicalRepWorker->relstate_lsn = current_lsn;
307 : :
308 : 183 : SpinLockRelease(&MyLogicalRepWorker->relmutex);
309 : :
310 : : /*
311 : : * UpdateSubscriptionRelState must be called within a transaction.
312 : : */
1667 akapila@postgresql.o 313 [ + - ]: 183 : if (!IsTransactionState())
314 : 183 : StartTransactionCommand();
315 : :
2710 peter_e@gmx.net 316 : 183 : UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
317 : 183 : MyLogicalRepWorker->relid,
318 : 183 : MyLogicalRepWorker->relstate,
36 akapila@postgresql.o 319 : 183 : MyLogicalRepWorker->relstate_lsn,
320 : : false);
321 : :
322 : : /*
323 : : * End streaming so that LogRepWorkerWalRcvConn can be used to drop
324 : : * the slot.
325 : : */
1578 alvherre@alvh.no-ip. 326 : 183 : walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli);
327 : :
328 : : /*
329 : : * Cleanup the tablesync slot.
330 : : *
331 : : * This has to be done after updating the state because otherwise if
332 : : * there is an error while doing the database operations we won't be
333 : : * able to rollback dropped slot.
334 : : */
1667 akapila@postgresql.o 335 : 183 : ReplicationSlotNameForTablesync(MyLogicalRepWorker->subid,
336 : 183 : MyLogicalRepWorker->relid,
337 : : syncslotname,
338 : : sizeof(syncslotname));
339 : :
340 : : /*
341 : : * It is important to give an error if we are unable to drop the slot,
342 : : * otherwise, it won't be dropped till the corresponding subscription
343 : : * is dropped. So passing missing_ok = false.
344 : : */
1578 alvherre@alvh.no-ip. 345 : 183 : ReplicationSlotDropAtPubNode(LogRepWorkerWalRcvConn, syncslotname, false);
346 : :
1090 akapila@postgresql.o 347 : 183 : CommitTransactionCommand();
348 : 183 : pgstat_report_stat(false);
349 : :
350 : : /*
351 : : * Start a new transaction to clean up the tablesync origin tracking.
352 : : * This transaction will be ended within the finish_sync_worker().
353 : : * Now, even, if we fail to remove this here, the apply worker will
354 : : * ensure to clean it up afterward.
355 : : *
356 : : * We need to do this after the table state is set to SYNCDONE.
357 : : * Otherwise, if an error occurs while performing the database
358 : : * operation, the worker will be restarted and the in-memory state of
359 : : * replication progress (remote_lsn) won't be rolled-back which would
360 : : * have been cleared before restart. So, the restarted worker will use
361 : : * invalid replication progress state resulting in replay of
362 : : * transactions that have already been applied.
363 : : */
364 : 183 : StartTransactionCommand();
365 : :
1061 366 : 183 : ReplicationOriginNameForLogicalRep(MyLogicalRepWorker->subid,
367 : 183 : MyLogicalRepWorker->relid,
368 : : originname,
369 : : sizeof(originname));
370 : :
371 : : /*
372 : : * Resetting the origin session removes the ownership of the slot.
373 : : * This is needed to allow the origin to be dropped.
374 : : */
1090 375 : 183 : replorigin_session_reset();
376 : 183 : replorigin_session_origin = InvalidRepOriginId;
377 : 183 : replorigin_session_origin_lsn = InvalidXLogRecPtr;
378 : 183 : replorigin_session_origin_timestamp = 0;
379 : :
380 : : /*
381 : : * Drop the tablesync's origin tracking if exists.
382 : : *
383 : : * There is a chance that the user is concurrently performing refresh
384 : : * for the subscription where we remove the table state and its origin
385 : : * or the apply worker would have removed this origin. So passing
386 : : * missing_ok = true.
387 : : */
388 : 183 : replorigin_drop_by_name(originname, true, false);
389 : :
3089 peter_e@gmx.net 390 : 183 : finish_sync_worker();
391 : : }
392 : : else
393 : 17 : SpinLockRelease(&MyLogicalRepWorker->relmutex);
394 : 17 : }
395 : :
396 : : /*
397 : : * Handle table synchronization cooperation from the apply worker.
398 : : *
399 : : * Walk over all subscription tables that are individually tracked by the
400 : : * apply process (currently, all that have state other than
401 : : * SUBREL_STATE_READY) and manage synchronization for them.
402 : : *
403 : : * If there are tables that need synchronizing and are not being synchronized
404 : : * yet, start sync workers for them (if there are free slots for sync
405 : : * workers). To prevent starting the sync worker for the same relation at a
406 : : * high frequency after a failure, we store its last start time with each sync
407 : : * state info. We start the sync worker for the same relation after waiting
408 : : * at least wal_retrieve_retry_interval.
409 : : *
410 : : * For tables that are being synchronized already, check if sync workers
411 : : * either need action from the apply worker or have finished. This is the
412 : : * SYNCWAIT to CATCHUP transition.
413 : : *
414 : : * If the synchronization position is reached (SYNCDONE), then the table can
415 : : * be marked as READY and is no longer tracked.
416 : : */
417 : : static void
418 : 3559 : process_syncing_tables_for_apply(XLogRecPtr current_lsn)
419 : : {
420 : : struct tablesync_start_time_mapping
421 : : {
422 : : Oid relid;
423 : : TimestampTz last_start_time;
424 : : };
425 : : static HTAB *last_start_times = NULL;
426 : : ListCell *lc;
3043 427 : 3559 : bool started_tx = false;
974 tgl@sss.pgh.pa.us 428 : 3559 : bool should_exit = false;
36 akapila@postgresql.o 429 : 3559 : Relation rel = NULL;
430 : :
3089 peter_e@gmx.net 431 [ - + ]: 3559 : Assert(!IsTransactionState());
432 : :
433 : : /* We need up-to-date sync state info for subscription tables here. */
1515 akapila@postgresql.o 434 : 3559 : FetchTableStates(&started_tx);
435 : :
436 : : /*
437 : : * Prepare a hash table for tracking last start times of workers, to avoid
438 : : * immediate restarts. We don't need it if there are no tables that need
439 : : * syncing.
440 : : */
1116 tgl@sss.pgh.pa.us 441 [ + + + + ]: 3559 : if (table_states_not_ready != NIL && !last_start_times)
3054 peter_e@gmx.net 442 : 122 : {
443 : : HASHCTL ctl;
444 : :
445 : 122 : ctl.keysize = sizeof(Oid);
446 : 122 : ctl.entrysize = sizeof(struct tablesync_start_time_mapping);
447 : 122 : last_start_times = hash_create("Logical replication table sync worker start times",
448 : : 256, &ctl, HASH_ELEM | HASH_BLOBS);
449 : : }
450 : :
451 : : /*
452 : : * Clean up the hash table when we're done with all tables (just to
453 : : * release the bit of memory).
454 : : */
1116 tgl@sss.pgh.pa.us 455 [ + + + + ]: 3437 : else if (table_states_not_ready == NIL && last_start_times)
456 : : {
3054 peter_e@gmx.net 457 : 91 : hash_destroy(last_start_times);
458 : 91 : last_start_times = NULL;
459 : : }
460 : :
461 : : /*
462 : : * Process all tables that are being synchronized.
463 : : */
1515 akapila@postgresql.o 464 [ + + + + : 5381 : foreach(lc, table_states_not_ready)
+ + ]
465 : : {
3034 bruce@momjian.us 466 : 1825 : SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
467 : :
3089 peter_e@gmx.net 468 [ + + ]: 1825 : if (rstate->state == SUBREL_STATE_SYNCDONE)
469 : : {
470 : : /*
471 : : * Apply has caught up to the position where the table sync has
472 : : * finished. Mark the table as ready so that the apply will just
473 : : * continue to replicate it normally.
474 : : */
475 [ + + ]: 182 : if (current_lsn >= rstate->lsn)
476 : : {
477 : : char originname[NAMEDATALEN];
478 : :
479 : 181 : rstate->state = SUBREL_STATE_READY;
480 : 181 : rstate->lsn = current_lsn;
3043 481 [ + + ]: 181 : if (!started_tx)
482 : : {
483 : 9 : StartTransactionCommand();
484 : 9 : started_tx = true;
485 : : }
486 : :
487 : : /*
488 : : * Remove the tablesync origin tracking if exists.
489 : : *
490 : : * There is a chance that the user is concurrently performing
491 : : * refresh for the subscription where we remove the table
492 : : * state and its origin or the tablesync worker would have
493 : : * already removed this origin. We can't rely on tablesync
494 : : * worker to remove the origin tracking as if there is any
495 : : * error while dropping we won't restart it to drop the
496 : : * origin. So passing missing_ok = true.
497 : : *
498 : : * Lock the subscription and origin in the same order as we
499 : : * are doing during DDL commands to avoid deadlocks. See
500 : : * AlterSubscription_refresh.
501 : : */
36 akapila@postgresql.o 502 : 181 : LockSharedObject(SubscriptionRelationId, MyLogicalRepWorker->subid,
503 : : 0, AccessShareLock);
504 : :
505 [ + - ]: 181 : if (!rel)
506 : 181 : rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
507 : :
1061 508 : 181 : ReplicationOriginNameForLogicalRep(MyLogicalRepWorker->subid,
509 : : rstate->relid,
510 : : originname,
511 : : sizeof(originname));
1090 512 : 181 : replorigin_drop_by_name(originname, true, false);
513 : :
514 : : /*
515 : : * Update the state to READY only after the origin cleanup.
516 : : */
2710 peter_e@gmx.net 517 : 181 : UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
518 : 181 : rstate->relid, rstate->state,
519 : : rstate->lsn, true);
520 : : }
521 : : }
522 : : else
523 : : {
524 : : LogicalRepWorker *syncworker;
525 : :
526 : : /*
527 : : * Look for a sync worker for this relation.
528 : : */
3089 529 : 1643 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
530 : :
531 : 1643 : syncworker = logicalrep_worker_find(MyLogicalRepWorker->subid,
532 : : rstate->relid, false);
533 : :
534 [ + + ]: 1643 : if (syncworker)
535 : : {
536 : : /* Found one, update our copy of its state */
537 [ - + ]: 740 : SpinLockAcquire(&syncworker->relmutex);
538 : 740 : rstate->state = syncworker->relstate;
539 : 740 : rstate->lsn = syncworker->relstate_lsn;
2990 tgl@sss.pgh.pa.us 540 [ + + ]: 740 : if (rstate->state == SUBREL_STATE_SYNCWAIT)
541 : : {
542 : : /*
543 : : * Sync worker is waiting for apply. Tell sync worker it
544 : : * can catchup now.
545 : : */
546 : 181 : syncworker->relstate = SUBREL_STATE_CATCHUP;
547 : 181 : syncworker->relstate_lsn =
548 : 181 : Max(syncworker->relstate_lsn, current_lsn);
549 : : }
3089 peter_e@gmx.net 550 : 740 : SpinLockRelease(&syncworker->relmutex);
551 : :
552 : : /* If we told worker to catch up, wait for it. */
2990 tgl@sss.pgh.pa.us 553 [ + + ]: 740 : if (rstate->state == SUBREL_STATE_SYNCWAIT)
554 : : {
555 : : /* Signal the sync worker, as it may be waiting for us. */
556 [ + - ]: 181 : if (syncworker->proc)
557 : 181 : logicalrep_worker_wakeup_ptr(syncworker);
558 : :
559 : : /* Now safe to release the LWLock */
560 : 181 : LWLockRelease(LogicalRepWorkerLock);
561 : :
635 akapila@postgresql.o 562 [ + - ]: 181 : if (started_tx)
563 : : {
564 : : /*
565 : : * We must commit the existing transaction to release
566 : : * the existing locks before entering a busy loop.
567 : : * This is required to avoid any undetected deadlocks
568 : : * due to any existing lock as deadlock detector won't
569 : : * be able to detect the waits on the latch.
570 : : *
571 : : * Also close any tables prior to the commit.
572 : : */
36 573 [ + + ]: 181 : if (rel)
574 : : {
575 : 28 : table_close(rel, NoLock);
576 : 28 : rel = NULL;
577 : : }
635 578 : 181 : CommitTransactionCommand();
579 : 181 : pgstat_report_stat(false);
580 : : }
581 : :
582 : : /*
583 : : * Enter busy loop and wait for synchronization worker to
584 : : * reach expected state (or die trying).
585 : : */
586 : 181 : StartTransactionCommand();
587 : 181 : started_tx = true;
588 : :
2990 tgl@sss.pgh.pa.us 589 : 181 : wait_for_relation_state_change(rstate->relid,
590 : : SUBREL_STATE_SYNCDONE);
591 : : }
592 : : else
593 : 559 : LWLockRelease(LogicalRepWorkerLock);
594 : : }
595 : : else
596 : : {
597 : : /*
598 : : * If there is no sync worker for this table yet, count
599 : : * running sync workers for this subscription, while we have
600 : : * the lock.
601 : : */
602 : : int nsyncworkers =
841 603 : 903 : logicalrep_sync_worker_count(MyLogicalRepWorker->subid);
604 : :
605 : : /* Now safe to release the LWLock */
2990 606 : 903 : LWLockRelease(LogicalRepWorkerLock);
607 : :
608 : : /*
609 : : * If there are free sync worker slot(s), start a new sync
610 : : * worker for the table.
611 : : */
612 [ + + ]: 903 : if (nsyncworkers < max_sync_workers_per_subscription)
613 : : {
614 : 203 : TimestampTz now = GetCurrentTimestamp();
615 : : struct tablesync_start_time_mapping *hentry;
616 : : bool found;
617 : :
618 : 203 : hentry = hash_search(last_start_times, &rstate->relid,
619 : : HASH_ENTER, &found);
620 : :
621 [ + + + + ]: 216 : if (!found ||
622 : 13 : TimestampDifferenceExceeds(hentry->last_start_time, now,
623 : : wal_retrieve_retry_interval))
624 : : {
625 : : /*
626 : : * Set the last_start_time even if we fail to start
627 : : * the worker, so that we won't retry until
628 : : * wal_retrieve_retry_interval has elapsed.
629 : : */
630 : 196 : hentry->last_start_time = now;
74 631 : 196 : (void) logicalrep_worker_launch(WORKERTYPE_TABLESYNC,
632 : 196 : MyLogicalRepWorker->dbid,
633 : 196 : MySubscription->oid,
634 : 196 : MySubscription->name,
635 : 196 : MyLogicalRepWorker->userid,
636 : : rstate->relid,
637 : : DSM_HANDLE_INVALID,
638 : : false);
639 : : }
640 : : }
641 : : }
642 : : }
643 : : }
644 : :
645 : : /* Close table if opened */
36 akapila@postgresql.o 646 [ + + ]: 3556 : if (rel)
647 : 153 : table_close(rel, NoLock);
648 : :
649 : :
3043 peter_e@gmx.net 650 [ + + ]: 3556 : if (started_tx)
651 : : {
652 : : /*
653 : : * Even when the two_phase mode is requested by the user, it remains
654 : : * as 'pending' until all tablesyncs have reached READY state.
655 : : *
656 : : * When this happens, we restart the apply worker and (if the
657 : : * conditions are still ok) then the two_phase tri-state will become
658 : : * 'enabled' at that time.
659 : : *
660 : : * Note: If the subscription has no tables then leave the state as
661 : : * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
662 : : * work.
663 : : */
974 tgl@sss.pgh.pa.us 664 [ + + ]: 859 : if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING)
665 : : {
666 : 25 : CommandCounterIncrement(); /* make updates visible */
667 [ + + ]: 25 : if (AllTablesyncsReady())
668 : : {
669 [ + - ]: 6 : ereport(LOG,
670 : : (errmsg("logical replication apply worker for subscription \"%s\" will restart so that two_phase can be enabled",
671 : : MySubscription->name)));
672 : 6 : should_exit = true;
673 : : }
674 : : }
675 : :
3043 peter_e@gmx.net 676 : 859 : CommitTransactionCommand();
1249 andres@anarazel.de 677 : 859 : pgstat_report_stat(true);
678 : : }
679 : :
974 tgl@sss.pgh.pa.us 680 [ + + ]: 3556 : if (should_exit)
681 : : {
682 : : /*
683 : : * Reset the last-start time for this worker so that the launcher will
684 : : * restart it without waiting for wal_retrieve_retry_interval.
685 : : */
958 686 : 6 : ApplyLauncherForgetWorkerStartTime(MySubscription->oid);
687 : :
974 688 : 6 : proc_exit(0);
689 : : }
3089 peter_e@gmx.net 690 : 3550 : }
691 : :
692 : : /*
693 : : * Process possible state change(s) of tables that are being synchronized.
694 : : */
695 : : void
696 : 3781 : process_syncing_tables(XLogRecPtr current_lsn)
697 : : {
746 akapila@postgresql.o 698 [ + + + - : 3781 : switch (MyLogicalRepWorker->type)
- ]
699 : : {
700 : 22 : case WORKERTYPE_PARALLEL_APPLY:
701 : :
702 : : /*
703 : : * Skip for parallel apply workers because they only operate on
704 : : * tables that are in a READY state. See pa_can_start() and
705 : : * should_apply_changes_for_rel().
706 : : */
707 : 22 : break;
708 : :
709 : 200 : case WORKERTYPE_TABLESYNC:
710 : 200 : process_syncing_tables_for_sync(current_lsn);
711 : 17 : break;
712 : :
713 : 3559 : case WORKERTYPE_APPLY:
714 : 3559 : process_syncing_tables_for_apply(current_lsn);
715 : 3550 : break;
716 : :
746 akapila@postgresql.o 717 :UBC 0 : case WORKERTYPE_UNKNOWN:
718 : : /* Should never happen. */
719 [ # # ]: 0 : elog(ERROR, "Unknown worker type");
720 : : }
3089 peter_e@gmx.net 721 :CBC 3589 : }
722 : :
723 : : /*
724 : : * Create list of columns for COPY based on logical relation mapping.
725 : : */
726 : : static List *
727 : 192 : make_copy_attnamelist(LogicalRepRelMapEntry *rel)
728 : : {
729 : 192 : List *attnamelist = NIL;
730 : : int i;
731 : :
3033 732 [ + + ]: 515 : for (i = 0; i < rel->remoterel.natts; i++)
733 : : {
3089 734 : 323 : attnamelist = lappend(attnamelist,
3033 735 : 323 : makeString(rel->remoterel.attnames[i]));
736 : : }
737 : :
738 : :
3089 739 : 192 : return attnamelist;
740 : : }
741 : :
742 : : /*
743 : : * Data source callback for the COPY FROM, which reads from the remote
744 : : * connection and passes the data back to our local COPY.
745 : : */
746 : : static int
747 : 13984 : copy_read_data(void *outbuf, int minread, int maxread)
748 : : {
3034 bruce@momjian.us 749 : 13984 : int bytesread = 0;
750 : : int avail;
751 : :
752 : : /* If there are some leftover data from previous read, use it. */
3089 peter_e@gmx.net 753 : 13984 : avail = copybuf->len - copybuf->cursor;
754 [ - + ]: 13984 : if (avail)
755 : : {
3089 peter_e@gmx.net 756 [ # # ]:UBC 0 : if (avail > maxread)
757 : 0 : avail = maxread;
758 : 0 : memcpy(outbuf, ©buf->data[copybuf->cursor], avail);
759 : 0 : copybuf->cursor += avail;
760 : 0 : maxread -= avail;
761 : 0 : bytesread += avail;
762 : : }
763 : :
3018 peter_e@gmx.net 764 [ + - + - ]:CBC 13984 : while (maxread > 0 && bytesread < minread)
765 : : {
3089 766 : 13984 : pgsocket fd = PGINVALID_SOCKET;
767 : : int len;
768 : 13984 : char *buf = NULL;
769 : :
770 : : for (;;)
771 : : {
772 : : /* Try read the data. */
1578 alvherre@alvh.no-ip. 773 : 13984 : len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
774 : :
3089 peter_e@gmx.net 775 [ - + ]: 13984 : CHECK_FOR_INTERRUPTS();
776 : :
777 [ - + ]: 13984 : if (len == 0)
3089 peter_e@gmx.net 778 :UBC 0 : break;
3089 peter_e@gmx.net 779 [ + + ]:CBC 13984 : else if (len < 0)
780 : 13984 : return bytesread;
781 : : else
782 : : {
783 : : /* Process the data */
784 : 13794 : copybuf->data = buf;
785 : 13794 : copybuf->len = len;
786 : 13794 : copybuf->cursor = 0;
787 : :
788 : 13794 : avail = copybuf->len - copybuf->cursor;
789 [ - + ]: 13794 : if (avail > maxread)
3089 peter_e@gmx.net 790 :UBC 0 : avail = maxread;
3089 peter_e@gmx.net 791 :CBC 13794 : memcpy(outbuf, ©buf->data[copybuf->cursor], avail);
282 peter@eisentraut.org 792 : 13794 : outbuf = (char *) outbuf + avail;
3089 peter_e@gmx.net 793 : 13794 : copybuf->cursor += avail;
794 : 13794 : maxread -= avail;
795 : 13794 : bytesread += avail;
796 : : }
797 : :
798 [ + - + - ]: 13794 : if (maxread <= 0 || bytesread >= minread)
799 : 13794 : return bytesread;
800 : : }
801 : :
802 : : /*
803 : : * Wait for more data or latch.
804 : : */
2479 tmunro@postgresql.or 805 :UBC 0 : (void) WaitLatchOrSocket(MyLatch,
806 : : WL_SOCKET_READABLE | WL_LATCH_SET |
807 : : WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
808 : : fd, 1000L, WAIT_EVENT_LOGICAL_SYNC_DATA);
809 : :
3014 andres@anarazel.de 810 : 0 : ResetLatch(MyLatch);
811 : : }
812 : :
3089 peter_e@gmx.net 813 : 0 : return bytesread;
814 : : }
815 : :
816 : :
817 : : /*
818 : : * Get information about remote relation in similar fashion the RELATION
819 : : * message provides during replication.
820 : : *
821 : : * This function also returns (a) the relation qualifications to be used in
822 : : * the COPY command, and (b) whether the remote relation has published any
823 : : * generated column.
824 : : */
825 : : static void
311 akapila@postgresql.o 826 :CBC 194 : fetch_remote_table_info(char *nspname, char *relname, LogicalRepRelation *lrel,
827 : : List **qual, bool *gencol_published)
828 : : {
829 : : WalRcvExecResult *res;
830 : : StringInfoData cmd;
831 : : TupleTableSlot *slot;
1997 peter@eisentraut.org 832 : 194 : Oid tableRow[] = {OIDOID, CHAROID, CHAROID};
311 akapila@postgresql.o 833 : 194 : Oid attrRow[] = {INT2OID, TEXTOID, OIDOID, BOOLOID, BOOLOID};
1292 834 : 194 : Oid qualRow[] = {TEXTOID};
835 : : bool isnull;
836 : : int natt;
316 michael@paquier.xyz 837 : 194 : StringInfo pub_names = NULL;
1260 tomas.vondra@postgre 838 : 194 : Bitmapset *included_cols = NULL;
311 akapila@postgresql.o 839 : 194 : int server_version = walrcv_server_version(LogRepWorkerWalRcvConn);
840 : :
3089 peter_e@gmx.net 841 : 194 : lrel->nspname = nspname;
842 : 194 : lrel->relname = relname;
843 : :
844 : : /* First fetch Oid and replica identity. */
845 : 194 : initStringInfo(&cmd);
1997 peter@eisentraut.org 846 : 194 : appendStringInfo(&cmd, "SELECT c.oid, c.relreplident, c.relkind"
847 : : " FROM pg_catalog.pg_class c"
848 : : " INNER JOIN pg_catalog.pg_namespace n"
849 : : " ON (c.relnamespace = n.oid)"
850 : : " WHERE n.nspname = %s"
851 : : " AND c.relname = %s",
852 : : quote_literal_cstr(nspname),
853 : : quote_literal_cstr(relname));
1578 alvherre@alvh.no-ip. 854 : 194 : res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
855 : : lengthof(tableRow), tableRow);
856 : :
3089 peter_e@gmx.net 857 [ - + ]: 194 : if (res->status != WALRCV_OK_TUPLES)
3089 peter_e@gmx.net 858 [ # # ]:UBC 0 : ereport(ERROR,
859 : : (errcode(ERRCODE_CONNECTION_FAILURE),
860 : : errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s",
861 : : nspname, relname, res->err)));
862 : :
2487 andres@anarazel.de 863 :CBC 194 : slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
3089 peter_e@gmx.net 864 [ - + ]: 194 : if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot))
3089 peter_e@gmx.net 865 [ # # ]:UBC 0 : ereport(ERROR,
866 : : (errcode(ERRCODE_UNDEFINED_OBJECT),
867 : : errmsg("table \"%s.%s\" not found on publisher",
868 : : nspname, relname)));
869 : :
3089 peter_e@gmx.net 870 :CBC 194 : lrel->remoteid = DatumGetObjectId(slot_getattr(slot, 1, &isnull));
871 [ - + ]: 194 : Assert(!isnull);
872 : 194 : lrel->replident = DatumGetChar(slot_getattr(slot, 2, &isnull));
873 [ - + ]: 194 : Assert(!isnull);
1997 peter@eisentraut.org 874 : 194 : lrel->relkind = DatumGetChar(slot_getattr(slot, 3, &isnull));
875 [ - + ]: 194 : Assert(!isnull);
876 : :
3089 peter_e@gmx.net 877 : 194 : ExecDropSingleTupleTableSlot(slot);
878 : 194 : walrcv_clear_result(res);
879 : :
880 : :
881 : : /*
882 : : * Get column lists for each relation.
883 : : *
884 : : * We need to do this before fetching info about column names and types,
885 : : * so that we can skip columns that should not be replicated.
886 : : */
311 akapila@postgresql.o 887 [ + - ]: 194 : if (server_version >= 150000)
888 : : {
889 : : WalRcvExecResult *pubres;
890 : : TupleTableSlot *tslot;
1192 891 : 194 : Oid attrsRow[] = {INT2VECTOROID};
892 : :
893 : : /* Build the pub_names comma-separated string. */
316 michael@paquier.xyz 894 : 194 : pub_names = makeStringInfo();
895 : 194 : GetPublicationsStr(MySubscription->publications, pub_names, true);
896 : :
897 : : /*
898 : : * Fetch info about column lists for the relation (from all the
899 : : * publications).
900 : : */
1260 tomas.vondra@postgre 901 : 194 : resetStringInfo(&cmd);
902 : 194 : appendStringInfo(&cmd,
903 : : "SELECT DISTINCT"
904 : : " (CASE WHEN (array_length(gpt.attrs, 1) = c.relnatts)"
905 : : " THEN NULL ELSE gpt.attrs END)"
906 : : " FROM pg_publication p,"
907 : : " LATERAL pg_get_publication_tables(p.pubname) gpt,"
908 : : " pg_class c"
909 : : " WHERE gpt.relid = %u AND c.oid = gpt.relid"
910 : : " AND p.pubname IN ( %s )",
911 : : lrel->remoteid,
912 : : pub_names->data);
913 : :
914 : 194 : pubres = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
915 : : lengthof(attrsRow), attrsRow);
916 : :
917 [ - + ]: 194 : if (pubres->status != WALRCV_OK_TUPLES)
1260 tomas.vondra@postgre 918 [ # # ]:UBC 0 : ereport(ERROR,
919 : : (errcode(ERRCODE_CONNECTION_FAILURE),
920 : : errmsg("could not fetch column list info for table \"%s.%s\" from publisher: %s",
921 : : nspname, relname, pubres->err)));
922 : :
923 : : /*
924 : : * We don't support the case where the column list is different for
925 : : * the same table when combining publications. See comments atop
926 : : * fetch_table_list. So there should be only one row returned.
927 : : * Although we already checked this when creating the subscription, we
928 : : * still need to check here in case the column list was changed after
929 : : * creating the subscription and before the sync worker is started.
930 : : */
1192 akapila@postgresql.o 931 [ - + ]:CBC 194 : if (tuplestore_tuple_count(pubres->tuplestore) > 1)
1192 akapila@postgresql.o 932 [ # # ]:UBC 0 : ereport(ERROR,
933 : : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
934 : : errmsg("cannot use different column lists for table \"%s.%s\" in different publications",
935 : : nspname, relname));
936 : :
937 : : /*
938 : : * Get the column list and build a single bitmap with the attnums.
939 : : *
940 : : * If we find a NULL value, it means all the columns should be
941 : : * replicated.
942 : : */
1113 drowley@postgresql.o 943 :CBC 194 : tslot = MakeSingleTupleTableSlot(pubres->tupledesc, &TTSOpsMinimalTuple);
944 [ + - ]: 194 : if (tuplestore_gettupleslot(pubres->tuplestore, true, false, tslot))
945 : : {
946 : 194 : Datum cfval = slot_getattr(tslot, 1, &isnull);
947 : :
1192 akapila@postgresql.o 948 [ + + ]: 194 : if (!isnull)
949 : : {
950 : : ArrayType *arr;
951 : : int nelems;
952 : : int16 *elems;
953 : :
954 : 22 : arr = DatumGetArrayTypeP(cfval);
955 : 22 : nelems = ARR_DIMS(arr)[0];
956 [ - + ]: 22 : elems = (int16 *) ARR_DATA_PTR(arr);
957 : :
958 [ + + ]: 59 : for (natt = 0; natt < nelems; natt++)
959 : 37 : included_cols = bms_add_member(included_cols, elems[natt]);
960 : : }
961 : :
1113 drowley@postgresql.o 962 : 194 : ExecClearTuple(tslot);
963 : : }
964 : 194 : ExecDropSingleTupleTableSlot(tslot);
965 : :
1260 tomas.vondra@postgre 966 : 194 : walrcv_clear_result(pubres);
967 : : }
968 : :
969 : : /*
970 : : * Now fetch column names and types.
971 : : */
3089 peter_e@gmx.net 972 : 194 : resetStringInfo(&cmd);
148 drowley@postgresql.o 973 : 194 : appendStringInfoString(&cmd,
974 : : "SELECT a.attnum,"
975 : : " a.attname,"
976 : : " a.atttypid,"
977 : : " a.attnum = ANY(i.indkey)");
978 : :
979 : : /* Generated columns can be replicated since version 18. */
311 akapila@postgresql.o 980 [ + - ]: 194 : if (server_version >= 180000)
148 drowley@postgresql.o 981 : 194 : appendStringInfoString(&cmd, ", a.attgenerated != ''");
982 : :
311 akapila@postgresql.o 983 [ + - ]: 388 : appendStringInfo(&cmd,
984 : : " FROM pg_catalog.pg_attribute a"
985 : : " LEFT JOIN pg_catalog.pg_index i"
986 : : " ON (i.indexrelid = pg_get_replica_identity_index(%u))"
987 : : " WHERE a.attnum > 0::pg_catalog.int2"
988 : : " AND NOT a.attisdropped %s"
989 : : " AND a.attrelid = %u"
990 : : " ORDER BY a.attnum",
991 : : lrel->remoteid,
992 [ - + ]: 194 : (server_version >= 120000 && server_version < 180000 ?
993 : : "AND a.attgenerated = ''" : ""),
994 : : lrel->remoteid);
1578 alvherre@alvh.no-ip. 995 [ + - ]: 194 : res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
996 : : server_version >= 180000 ? lengthof(attrRow) : lengthof(attrRow) - 1, attrRow);
997 : :
3089 peter_e@gmx.net 998 [ - + ]: 194 : if (res->status != WALRCV_OK_TUPLES)
3089 peter_e@gmx.net 999 [ # # ]:UBC 0 : ereport(ERROR,
1000 : : (errcode(ERRCODE_CONNECTION_FAILURE),
1001 : : errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s",
1002 : : nspname, relname, res->err)));
1003 : :
1004 : : /* We don't know the number of rows coming, so allocate enough space. */
3089 peter_e@gmx.net 1005 :CBC 194 : lrel->attnames = palloc0(MaxTupleAttributeNumber * sizeof(char *));
1006 : 194 : lrel->atttyps = palloc0(MaxTupleAttributeNumber * sizeof(Oid));
1007 : 194 : lrel->attkeys = NULL;
1008 : :
1009 : : /*
1010 : : * Store the columns as a list of names. Ignore those that are not
1011 : : * present in the column list, if there is one.
1012 : : */
1013 : 194 : natt = 0;
2487 andres@anarazel.de 1014 : 194 : slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
3089 peter_e@gmx.net 1015 [ + + ]: 554 : while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
1016 : : {
1017 : : char *rel_colname;
1018 : : AttrNumber attnum;
1019 : :
1260 tomas.vondra@postgre 1020 : 360 : attnum = DatumGetInt16(slot_getattr(slot, 1, &isnull));
1021 [ - + ]: 360 : Assert(!isnull);
1022 : :
1023 : : /* If the column is not in the column list, skip it. */
1024 [ + + + + ]: 360 : if (included_cols != NULL && !bms_is_member(attnum, included_cols))
1025 : : {
1026 : 31 : ExecClearTuple(slot);
1027 : 31 : continue;
1028 : : }
1029 : :
1030 : 329 : rel_colname = TextDatumGetCString(slot_getattr(slot, 2, &isnull));
3089 peter_e@gmx.net 1031 [ - + ]: 329 : Assert(!isnull);
1032 : :
1260 tomas.vondra@postgre 1033 : 329 : lrel->attnames[natt] = rel_colname;
1034 : 329 : lrel->atttyps[natt] = DatumGetObjectId(slot_getattr(slot, 3, &isnull));
3089 peter_e@gmx.net 1035 [ - + ]: 329 : Assert(!isnull);
1036 : :
1260 tomas.vondra@postgre 1037 [ + + ]: 329 : if (DatumGetBool(slot_getattr(slot, 4, &isnull)))
3089 peter_e@gmx.net 1038 : 108 : lrel->attkeys = bms_add_member(lrel->attkeys, natt);
1039 : :
1040 : : /* Remember if the remote table has published any generated column. */
311 akapila@postgresql.o 1041 [ + - + - ]: 329 : if (server_version >= 180000 && !(*gencol_published))
1042 : : {
1043 : 329 : *gencol_published = DatumGetBool(slot_getattr(slot, 5, &isnull));
1044 [ - + ]: 329 : Assert(!isnull);
1045 : : }
1046 : :
1047 : : /* Should never happen. */
3089 peter_e@gmx.net 1048 [ - + ]: 329 : if (++natt >= MaxTupleAttributeNumber)
3089 peter_e@gmx.net 1049 [ # # ]:UBC 0 : elog(ERROR, "too many columns in remote table \"%s.%s\"",
1050 : : nspname, relname);
1051 : :
3089 peter_e@gmx.net 1052 :CBC 329 : ExecClearTuple(slot);
1053 : : }
1054 : 194 : ExecDropSingleTupleTableSlot(slot);
1055 : :
1056 : 194 : lrel->natts = natt;
1057 : :
1058 : 194 : walrcv_clear_result(res);
1059 : :
1060 : : /*
1061 : : * Get relation's row filter expressions. DISTINCT avoids the same
1062 : : * expression of a table in multiple publications from being included
1063 : : * multiple times in the final expression.
1064 : : *
1065 : : * We need to copy the row even if it matches just one of the
1066 : : * publications, so we later combine all the quals with OR.
1067 : : *
1068 : : * For initial synchronization, row filtering can be ignored in following
1069 : : * cases:
1070 : : *
1071 : : * 1) one of the subscribed publications for the table hasn't specified
1072 : : * any row filter
1073 : : *
1074 : : * 2) one of the subscribed publications has puballtables set to true
1075 : : *
1076 : : * 3) one of the subscribed publications is declared as TABLES IN SCHEMA
1077 : : * that includes this relation
1078 : : */
311 akapila@postgresql.o 1079 [ + - ]: 194 : if (server_version >= 150000)
1080 : : {
1081 : : /* Reuse the already-built pub_names. */
316 michael@paquier.xyz 1082 [ - + ]: 194 : Assert(pub_names != NULL);
1083 : :
1084 : : /* Check for row filters. */
1292 akapila@postgresql.o 1085 : 194 : resetStringInfo(&cmd);
1086 : 194 : appendStringInfo(&cmd,
1087 : : "SELECT DISTINCT pg_get_expr(gpt.qual, gpt.relid)"
1088 : : " FROM pg_publication p,"
1089 : : " LATERAL pg_get_publication_tables(p.pubname) gpt"
1090 : : " WHERE gpt.relid = %u"
1091 : : " AND p.pubname IN ( %s )",
1092 : : lrel->remoteid,
1093 : : pub_names->data);
1094 : :
1095 : 194 : res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 1, qualRow);
1096 : :
1097 [ - + ]: 194 : if (res->status != WALRCV_OK_TUPLES)
1292 akapila@postgresql.o 1098 [ # # ]:UBC 0 : ereport(ERROR,
1099 : : (errmsg("could not fetch table WHERE clause info for table \"%s.%s\" from publisher: %s",
1100 : : nspname, relname, res->err)));
1101 : :
1102 : : /*
1103 : : * Multiple row filter expressions for the same table will be combined
1104 : : * by COPY using OR. If any of the filter expressions for this table
1105 : : * are null, it means the whole table will be copied. In this case it
1106 : : * is not necessary to construct a unified row filter expression at
1107 : : * all.
1108 : : */
1292 akapila@postgresql.o 1109 :CBC 194 : slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
1110 [ + + ]: 209 : while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
1111 : : {
1112 : 198 : Datum rf = slot_getattr(slot, 1, &isnull);
1113 : :
1114 [ + + ]: 198 : if (!isnull)
1115 : 15 : *qual = lappend(*qual, makeString(TextDatumGetCString(rf)));
1116 : : else
1117 : : {
1118 : : /* Ignore filters and cleanup as necessary. */
1119 [ + + ]: 183 : if (*qual)
1120 : : {
1121 : 3 : list_free_deep(*qual);
1122 : 3 : *qual = NIL;
1123 : : }
1124 : 183 : break;
1125 : : }
1126 : :
1127 : 15 : ExecClearTuple(slot);
1128 : : }
1129 : 194 : ExecDropSingleTupleTableSlot(slot);
1130 : :
1131 : 194 : walrcv_clear_result(res);
316 michael@paquier.xyz 1132 : 194 : destroyStringInfo(pub_names);
1133 : : }
1134 : :
3089 peter_e@gmx.net 1135 : 194 : pfree(cmd.data);
1136 : 194 : }
1137 : :
1138 : : /*
1139 : : * Copy existing data of a table from publisher.
1140 : : *
1141 : : * Caller is responsible for locking the local relation.
1142 : : */
1143 : : static void
1144 : 194 : copy_table(Relation rel)
1145 : : {
1146 : : LogicalRepRelMapEntry *relmapentry;
1147 : : LogicalRepRelation lrel;
1292 akapila@postgresql.o 1148 : 194 : List *qual = NIL;
1149 : : WalRcvExecResult *res;
1150 : : StringInfoData cmd;
1151 : : CopyFromState cstate;
1152 : : List *attnamelist;
1153 : : ParseState *pstate;
898 1154 : 194 : List *options = NIL;
311 1155 : 194 : bool gencol_published = false;
1156 : :
1157 : : /* Get the publisher relation info. */
3089 peter_e@gmx.net 1158 : 194 : fetch_remote_table_info(get_namespace_name(RelationGetNamespace(rel)),
311 akapila@postgresql.o 1159 : 194 : RelationGetRelationName(rel), &lrel, &qual,
1160 : : &gencol_published);
1161 : :
1162 : : /* Put the relation into relmap. */
3089 peter_e@gmx.net 1163 : 194 : logicalrep_relmap_update(&lrel);
1164 : :
1165 : : /* Map the publisher relation to local one. */
1166 : 194 : relmapentry = logicalrep_rel_open(lrel.remoteid, NoLock);
1167 [ - + ]: 192 : Assert(rel == relmapentry->localrel);
1168 : :
1169 : : /* Start copy on the publisher. */
1170 : 192 : initStringInfo(&cmd);
1171 : :
1172 : : /* Regular table with no row filter or generated columns */
311 akapila@postgresql.o 1173 [ + + + + : 192 : if (lrel.relkind == RELKIND_RELATION && qual == NIL && !gencol_published)
+ + ]
1174 : : {
654 1175 : 164 : appendStringInfo(&cmd, "COPY %s",
1997 peter@eisentraut.org 1176 : 164 : quote_qualified_identifier(lrel.nspname, lrel.relname));
1177 : :
1178 : : /* If the table has columns, then specify the columns */
654 akapila@postgresql.o 1179 [ + + ]: 164 : if (lrel.natts)
1180 : : {
1181 : 163 : appendStringInfoString(&cmd, " (");
1182 : :
1183 : : /*
1184 : : * XXX Do we need to list the columns in all cases? Maybe we're
1185 : : * replicating all columns?
1186 : : */
1187 [ + + ]: 444 : for (int i = 0; i < lrel.natts; i++)
1188 : : {
1189 [ + + ]: 281 : if (i > 0)
1190 : 118 : appendStringInfoString(&cmd, ", ");
1191 : :
1192 : 281 : appendStringInfoString(&cmd, quote_identifier(lrel.attnames[i]));
1193 : : }
1194 : :
514 drowley@postgresql.o 1195 : 163 : appendStringInfoChar(&cmd, ')');
1196 : : }
1197 : :
654 akapila@postgresql.o 1198 : 164 : appendStringInfoString(&cmd, " TO STDOUT");
1199 : : }
1200 : : else
1201 : : {
1202 : : /*
1203 : : * For non-tables and tables with row filters, we need to do COPY
1204 : : * (SELECT ...), but we can't just do SELECT * because we may need to
1205 : : * copy only subset of columns including generated columns. For tables
1206 : : * with any row filters, build a SELECT query with OR'ed row filters
1207 : : * for COPY.
1208 : : *
1209 : : * We also need to use this same COPY (SELECT ...) syntax when
1210 : : * generated columns are published, because copy of generated columns
1211 : : * is not supported by the normal COPY.
1212 : : */
1787 drowley@postgresql.o 1213 : 28 : appendStringInfoString(&cmd, "COPY (SELECT ");
1997 peter@eisentraut.org 1214 [ + + ]: 70 : for (int i = 0; i < lrel.natts; i++)
1215 : : {
1216 : 42 : appendStringInfoString(&cmd, quote_identifier(lrel.attnames[i]));
1217 [ + + ]: 42 : if (i < lrel.natts - 1)
1218 : 14 : appendStringInfoString(&cmd, ", ");
1219 : : }
1220 : :
1292 akapila@postgresql.o 1221 : 28 : appendStringInfoString(&cmd, " FROM ");
1222 : :
1223 : : /*
1224 : : * For regular tables, make sure we don't copy data from a child that
1225 : : * inherits the named table as those will be copied separately.
1226 : : */
1227 [ + + ]: 28 : if (lrel.relkind == RELKIND_RELATION)
1228 : 11 : appendStringInfoString(&cmd, "ONLY ");
1229 : :
1230 : 28 : appendStringInfoString(&cmd, quote_qualified_identifier(lrel.nspname, lrel.relname));
1231 : : /* list of OR'ed filters */
1232 [ + + ]: 28 : if (qual != NIL)
1233 : : {
1234 : : ListCell *lc;
1235 : 11 : char *q = strVal(linitial(qual));
1236 : :
1237 : 11 : appendStringInfo(&cmd, " WHERE %s", q);
1238 [ + - + + : 12 : for_each_from(lc, qual, 1)
+ + ]
1239 : : {
1240 : 1 : q = strVal(lfirst(lc));
1241 : 1 : appendStringInfo(&cmd, " OR %s", q);
1242 : : }
1243 : 11 : list_free_deep(qual);
1244 : : }
1245 : :
1246 : 28 : appendStringInfoString(&cmd, ") TO STDOUT");
1247 : : }
1248 : :
1249 : : /*
1250 : : * Prior to v16, initial table synchronization will use text format even
1251 : : * if the binary option is enabled for a subscription.
1252 : : */
898 1253 [ + - ]: 192 : if (walrcv_server_version(LogRepWorkerWalRcvConn) >= 160000 &&
1254 [ + + ]: 192 : MySubscription->binary)
1255 : : {
1256 : 5 : appendStringInfoString(&cmd, " WITH (FORMAT binary)");
1257 : 5 : options = list_make1(makeDefElem("format",
1258 : : (Node *) makeString("binary"), -1));
1259 : : }
1260 : :
1578 alvherre@alvh.no-ip. 1261 : 192 : res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 0, NULL);
3089 peter_e@gmx.net 1262 : 192 : pfree(cmd.data);
1263 [ - + ]: 192 : if (res->status != WALRCV_OK_COPY_OUT)
3089 peter_e@gmx.net 1264 [ # # ]:UBC 0 : ereport(ERROR,
1265 : : (errcode(ERRCODE_CONNECTION_FAILURE),
1266 : : errmsg("could not start initial contents copy for table \"%s.%s\": %s",
1267 : : lrel.nspname, lrel.relname, res->err)));
3089 peter_e@gmx.net 1268 :CBC 192 : walrcv_clear_result(res);
1269 : :
1270 : 192 : copybuf = makeStringInfo();
1271 : :
3064 1272 : 192 : pstate = make_parsestate(NULL);
2074 tgl@sss.pgh.pa.us 1273 : 192 : (void) addRangeTableEntryForRelation(pstate, rel, AccessShareLock,
1274 : : NULL, false, false);
1275 : :
3089 peter_e@gmx.net 1276 : 192 : attnamelist = make_copy_attnamelist(relmapentry);
898 akapila@postgresql.o 1277 : 192 : cstate = BeginCopyFrom(pstate, rel, NULL, NULL, false, copy_read_data, attnamelist, options);
1278 : :
1279 : : /* Do the copy */
3089 peter_e@gmx.net 1280 : 191 : (void) CopyFrom(cstate);
1281 : :
1282 : 183 : logicalrep_rel_close(relmapentry, NoLock);
1283 : 183 : }
1284 : :
1285 : : /*
1286 : : * Determine the tablesync slot name.
1287 : : *
1288 : : * The name must not exceed NAMEDATALEN - 1 because of remote node constraints
1289 : : * on slot name length. We append system_identifier to avoid slot_name
1290 : : * collision with subscriptions in other clusters. With the current scheme
1291 : : * pg_%u_sync_%u_UINT64_FORMAT (3 + 10 + 6 + 10 + 20 + '\0'), the maximum
1292 : : * length of slot_name will be 50.
1293 : : *
1294 : : * The returned slot name is stored in the supplied buffer (syncslotname) with
1295 : : * the given size.
1296 : : *
1297 : : * Note: We don't use the subscription slot name as part of tablesync slot name
1298 : : * because we are responsible for cleaning up these slots and it could become
1299 : : * impossible to recalculate what name to cleanup if the subscription slot name
1300 : : * had changed.
1301 : : */
1302 : : void
1667 akapila@postgresql.o 1303 : 383 : ReplicationSlotNameForTablesync(Oid suboid, Oid relid,
1304 : : char *syncslotname, Size szslot)
1305 : : {
1664 1306 : 383 : snprintf(syncslotname, szslot, "pg_%u_sync_%u_" UINT64_FORMAT, suboid,
1307 : : relid, GetSystemIdentifier());
1667 1308 : 383 : }
1309 : :
1310 : : /*
1311 : : * Start syncing the table in the sync worker.
1312 : : *
1313 : : * If nothing needs to be done to sync the table, we exit the worker without
1314 : : * any further action.
1315 : : *
1316 : : * The returned slot name is palloc'ed in current memory context.
1317 : : */
1318 : : static char *
3089 peter_e@gmx.net 1319 : 195 : LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
1320 : : {
1321 : : char *slotname;
1322 : : char *err;
1323 : : char relstate;
1324 : : XLogRecPtr relstate_lsn;
1325 : : Relation rel;
1326 : : AclResult aclresult;
1327 : : WalRcvExecResult *res;
1328 : : char originname[NAMEDATALEN];
1329 : : RepOriginId originid;
1330 : : UserContext ucxt;
1331 : : bool must_use_password;
1332 : : bool run_as_owner;
1333 : :
1334 : : /* Check the state of the table synchronization. */
1335 : 195 : StartTransactionCommand();
3061 fujii@postgresql.org 1336 : 195 : relstate = GetSubscriptionRelState(MyLogicalRepWorker->subid,
1337 : 195 : MyLogicalRepWorker->relid,
1338 : : &relstate_lsn);
690 akapila@postgresql.o 1339 : 195 : CommitTransactionCommand();
1340 : :
1341 : : /* Is the use of a password mandatory? */
814 1342 [ + + ]: 386 : must_use_password = MySubscription->passwordrequired &&
690 1343 [ - + ]: 191 : !MySubscription->ownersuperuser;
1344 : :
3089 peter_e@gmx.net 1345 [ - + ]: 195 : SpinLockAcquire(&MyLogicalRepWorker->relmutex);
3061 fujii@postgresql.org 1346 : 195 : MyLogicalRepWorker->relstate = relstate;
1347 : 195 : MyLogicalRepWorker->relstate_lsn = relstate_lsn;
3089 peter_e@gmx.net 1348 : 195 : SpinLockRelease(&MyLogicalRepWorker->relmutex);
1349 : :
1350 : : /*
1351 : : * If synchronization is already done or no longer necessary, exit now
1352 : : * that we've updated shared memory state.
1353 : : */
1787 alvherre@alvh.no-ip. 1354 [ - + ]: 195 : switch (relstate)
1355 : : {
1787 alvherre@alvh.no-ip. 1356 :UBC 0 : case SUBREL_STATE_SYNCDONE:
1357 : : case SUBREL_STATE_READY:
1358 : : case SUBREL_STATE_UNKNOWN:
1359 : 0 : finish_sync_worker(); /* doesn't return */
1360 : : }
1361 : :
1362 : : /* Calculate the name of the tablesync slot. */
1664 akapila@postgresql.o 1363 :CBC 195 : slotname = (char *) palloc(NAMEDATALEN);
1364 : 195 : ReplicationSlotNameForTablesync(MySubscription->oid,
1365 : 195 : MyLogicalRepWorker->relid,
1366 : : slotname,
1367 : : NAMEDATALEN);
1368 : :
1369 : : /*
1370 : : * Here we use the slot name instead of the subscription name as the
1371 : : * application_name, so that it is different from the leader apply worker,
1372 : : * so that synchronous replication can distinguish them.
1373 : : */
1578 alvherre@alvh.no-ip. 1374 : 194 : LogRepWorkerWalRcvConn =
579 akapila@postgresql.o 1375 : 195 : walrcv_connect(MySubscription->conninfo, true, true,
1376 : : must_use_password,
1377 : : slotname, &err);
1578 alvherre@alvh.no-ip. 1378 [ - + ]: 194 : if (LogRepWorkerWalRcvConn == NULL)
3089 peter_e@gmx.net 1379 [ # # ]:UBC 0 : ereport(ERROR,
1380 : : (errcode(ERRCODE_CONNECTION_FAILURE),
1381 : : errmsg("table synchronization worker for subscription \"%s\" could not connect to the publisher: %s",
1382 : : MySubscription->name, err)));
1383 : :
1787 alvherre@alvh.no-ip. 1384 [ + + - + :CBC 194 : Assert(MyLogicalRepWorker->relstate == SUBREL_STATE_INIT ||
- - ]
1385 : : MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC ||
1386 : : MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY);
1387 : :
1388 : : /* Assign the origin tracking record name. */
1061 akapila@postgresql.o 1389 : 194 : ReplicationOriginNameForLogicalRep(MySubscription->oid,
1390 : 194 : MyLogicalRepWorker->relid,
1391 : : originname,
1392 : : sizeof(originname));
1393 : :
1667 1394 [ + + ]: 194 : if (MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC)
1395 : : {
1396 : : /*
1397 : : * We have previously errored out before finishing the copy so the
1398 : : * replication slot might exist. We want to remove the slot if it
1399 : : * already exists and proceed.
1400 : : *
1401 : : * XXX We could also instead try to drop the slot, last time we failed
1402 : : * but for that, we might need to clean up the copy state as it might
1403 : : * be in the middle of fetching the rows. Also, if there is a network
1404 : : * breakdown then it wouldn't have succeeded so trying it next time
1405 : : * seems like a better bet.
1406 : : */
1578 alvherre@alvh.no-ip. 1407 : 7 : ReplicationSlotDropAtPubNode(LogRepWorkerWalRcvConn, slotname, true);
1408 : : }
1667 akapila@postgresql.o 1409 [ - + ]: 187 : else if (MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY)
1410 : : {
1411 : : /*
1412 : : * The COPY phase was previously done, but tablesync then crashed
1413 : : * before it was able to finish normally.
1414 : : */
1667 akapila@postgresql.o 1415 :UBC 0 : StartTransactionCommand();
1416 : :
1417 : : /*
1418 : : * The origin tracking name must already exist. It was created first
1419 : : * time this tablesync was launched.
1420 : : */
1421 : 0 : originid = replorigin_by_name(originname, false);
971 1422 : 0 : replorigin_session_setup(originid, 0);
1667 1423 : 0 : replorigin_session_origin = originid;
1424 : 0 : *origin_startpos = replorigin_session_get_progress(false);
1425 : :
1426 : 0 : CommitTransactionCommand();
1427 : :
1428 : 0 : goto copy_table_done;
1429 : : }
1430 : :
1787 alvherre@alvh.no-ip. 1431 [ - + ]:CBC 194 : SpinLockAcquire(&MyLogicalRepWorker->relmutex);
1432 : 194 : MyLogicalRepWorker->relstate = SUBREL_STATE_DATASYNC;
1433 : 194 : MyLogicalRepWorker->relstate_lsn = InvalidXLogRecPtr;
1434 : 194 : SpinLockRelease(&MyLogicalRepWorker->relmutex);
1435 : :
1436 : : /* Update the state and make it visible to others. */
1437 : 194 : StartTransactionCommand();
1438 : 194 : UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
1439 : 194 : MyLogicalRepWorker->relid,
1440 : 194 : MyLogicalRepWorker->relstate,
36 akapila@postgresql.o 1441 : 194 : MyLogicalRepWorker->relstate_lsn,
1442 : : false);
1787 alvherre@alvh.no-ip. 1443 : 194 : CommitTransactionCommand();
1249 andres@anarazel.de 1444 : 194 : pgstat_report_stat(true);
1445 : :
1787 alvherre@alvh.no-ip. 1446 : 194 : StartTransactionCommand();
1447 : :
1448 : : /*
1449 : : * Use a standard write lock here. It might be better to disallow access
1450 : : * to the table while it's being synchronized. But we don't want to block
1451 : : * the main apply process from working and it has to open the relation in
1452 : : * RowExclusiveLock when remapping remote relation id to local one.
1453 : : */
1454 : 194 : rel = table_open(MyLogicalRepWorker->relid, RowExclusiveLock);
1455 : :
1456 : : /*
1457 : : * Start a transaction in the remote node in REPEATABLE READ mode. This
1458 : : * ensures that both the replication slot we create (see below) and the
1459 : : * COPY are consistent with each other.
1460 : : */
1578 1461 : 194 : res = walrcv_exec(LogRepWorkerWalRcvConn,
1462 : : "BEGIN READ ONLY ISOLATION LEVEL REPEATABLE READ",
1463 : : 0, NULL);
1787 1464 [ - + ]: 194 : if (res->status != WALRCV_OK_COMMAND)
1787 alvherre@alvh.no-ip. 1465 [ # # ]:UBC 0 : ereport(ERROR,
1466 : : (errcode(ERRCODE_CONNECTION_FAILURE),
1467 : : errmsg("table copy could not start transaction on publisher: %s",
1468 : : res->err)));
1787 alvherre@alvh.no-ip. 1469 :CBC 194 : walrcv_clear_result(res);
1470 : :
1471 : : /*
1472 : : * Create a new permanent logical decoding slot. This slot will be used
1473 : : * for the catchup phase after COPY is done, so tell it to use the
1474 : : * snapshot to make the final data consistent.
1475 : : */
1515 akapila@postgresql.o 1476 : 194 : walrcv_create_slot(LogRepWorkerWalRcvConn,
1477 : : slotname, false /* permanent */ , false /* two_phase */ ,
1478 : : MySubscription->failover,
1479 : : CRS_USE_SNAPSHOT, origin_startpos);
1480 : :
1481 : : /*
1482 : : * Setup replication origin tracking. The purpose of doing this before the
1483 : : * copy is to avoid doing the copy again due to any error in setting up
1484 : : * origin tracking.
1485 : : */
1667 1486 : 194 : originid = replorigin_by_name(originname, true);
1487 [ + - ]: 194 : if (!OidIsValid(originid))
1488 : : {
1489 : : /*
1490 : : * Origin tracking does not exist, so create it now.
1491 : : *
1492 : : * Then advance to the LSN got from walrcv_create_slot. This is WAL
1493 : : * logged for the purpose of recovery. Locks are to prevent the
1494 : : * replication origin from vanishing while advancing.
1495 : : */
1496 : 194 : originid = replorigin_create(originname);
1497 : :
1498 : 194 : LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1499 : 194 : replorigin_advance(originid, *origin_startpos, InvalidXLogRecPtr,
1500 : : true /* go backward */ , true /* WAL log */ );
1501 : 194 : UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1502 : :
971 1503 : 194 : replorigin_session_setup(originid, 0);
1667 1504 : 194 : replorigin_session_origin = originid;
1505 : : }
1506 : : else
1507 : : {
1667 akapila@postgresql.o 1508 [ # # ]:UBC 0 : ereport(ERROR,
1509 : : (errcode(ERRCODE_DUPLICATE_OBJECT),
1510 : : errmsg("replication origin \"%s\" already exists",
1511 : : originname)));
1512 : : }
1513 : :
1514 : : /*
1515 : : * Make sure that the copy command runs as the table owner, unless the
1516 : : * user has opted out of that behaviour.
1517 : : */
820 msawada@postgresql.o 1518 :CBC 194 : run_as_owner = MySubscription->runasowner;
1519 [ + + ]: 194 : if (!run_as_owner)
1520 : 193 : SwitchToUntrustedUser(rel->rd_rel->relowner, &ucxt);
1521 : :
1522 : : /*
1523 : : * Check that our table sync worker has permission to insert into the
1524 : : * target table.
1525 : : */
1526 : 194 : aclresult = pg_class_aclcheck(RelationGetRelid(rel), GetUserId(),
1527 : : ACL_INSERT);
1528 [ - + ]: 194 : if (aclresult != ACLCHECK_OK)
820 msawada@postgresql.o 1529 :UBC 0 : aclcheck_error(aclresult,
1530 : 0 : get_relkind_objtype(rel->rd_rel->relkind),
1531 : 0 : RelationGetRelationName(rel));
1532 : :
1533 : : /*
1534 : : * COPY FROM does not honor RLS policies. That is not a problem for
1535 : : * subscriptions owned by roles with BYPASSRLS privilege (or superuser,
1536 : : * who has it implicitly), but other roles should not be able to
1537 : : * circumvent RLS. Disallow logical replication into RLS enabled
1538 : : * relations for such roles.
1539 : : */
820 msawada@postgresql.o 1540 [ - + ]:CBC 194 : if (check_enable_rls(RelationGetRelid(rel), InvalidOid, false) == RLS_ENABLED)
820 msawada@postgresql.o 1541 [ # # ]:UBC 0 : ereport(ERROR,
1542 : : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1543 : : errmsg("user \"%s\" cannot replicate into relation with row-level security enabled: \"%s\"",
1544 : : GetUserNameFromId(GetUserId(), true),
1545 : : RelationGetRelationName(rel))));
1546 : :
1547 : : /* Now do the initial data copy */
1248 tomas.vondra@postgre 1548 :CBC 194 : PushActiveSnapshot(GetTransactionSnapshot());
1549 : 194 : copy_table(rel);
1550 : 183 : PopActiveSnapshot();
1551 : :
1578 alvherre@alvh.no-ip. 1552 : 183 : res = walrcv_exec(LogRepWorkerWalRcvConn, "COMMIT", 0, NULL);
1787 1553 [ - + ]: 183 : if (res->status != WALRCV_OK_COMMAND)
1787 alvherre@alvh.no-ip. 1554 [ # # ]:UBC 0 : ereport(ERROR,
1555 : : (errcode(ERRCODE_CONNECTION_FAILURE),
1556 : : errmsg("table copy could not finish transaction on publisher: %s",
1557 : : res->err)));
1787 alvherre@alvh.no-ip. 1558 :CBC 183 : walrcv_clear_result(res);
1559 : :
809 tgl@sss.pgh.pa.us 1560 [ + + ]: 183 : if (!run_as_owner)
820 msawada@postgresql.o 1561 : 182 : RestoreUserContext(&ucxt);
1562 : :
1787 alvherre@alvh.no-ip. 1563 : 183 : table_close(rel, NoLock);
1564 : :
1565 : : /* Make the copy visible. */
1566 : 183 : CommandCounterIncrement();
1567 : :
1568 : : /*
1569 : : * Update the persisted state to indicate the COPY phase is done; make it
1570 : : * visible to others.
1571 : : */
1667 akapila@postgresql.o 1572 : 183 : UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
1573 : 183 : MyLogicalRepWorker->relid,
1574 : : SUBREL_STATE_FINISHEDCOPY,
36 1575 : 183 : MyLogicalRepWorker->relstate_lsn,
1576 : : false);
1577 : :
1667 1578 : 183 : CommitTransactionCommand();
1579 : :
1580 : 183 : copy_table_done:
1581 : :
1582 [ - + ]: 183 : elog(DEBUG1,
1583 : : "LogicalRepSyncTableStart: '%s' origin_startpos lsn %X/%08X",
1584 : : originname, LSN_FORMAT_ARGS(*origin_startpos));
1585 : :
1586 : : /*
1587 : : * We are done with the initial data synchronization, update the state.
1588 : : */
1787 alvherre@alvh.no-ip. 1589 [ - + ]: 183 : SpinLockAcquire(&MyLogicalRepWorker->relmutex);
1590 : 183 : MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCWAIT;
1591 : 183 : MyLogicalRepWorker->relstate_lsn = *origin_startpos;
1592 : 183 : SpinLockRelease(&MyLogicalRepWorker->relmutex);
1593 : :
1594 : : /*
1595 : : * Finally, wait until the leader apply worker tells us to catch up and
1596 : : * then return to let LogicalRepApplyLoop do it.
1597 : : */
1598 : 183 : wait_for_worker_state_change(SUBREL_STATE_CATCHUP);
3089 peter_e@gmx.net 1599 : 183 : return slotname;
1600 : : }
1601 : :
1602 : : /*
1603 : : * Common code to fetch the up-to-date sync state info into the static lists.
1604 : : *
1605 : : * Returns true if subscription has 1 or more tables, else false.
1606 : : *
1607 : : * Note: If this function started the transaction (indicated by the parameter)
1608 : : * then it is the caller's responsibility to commit it.
1609 : : */
1610 : : static bool
1515 akapila@postgresql.o 1611 : 3684 : FetchTableStates(bool *started_tx)
1612 : : {
1613 : : static bool has_subrels = false;
1614 : :
1615 : 3684 : *started_tx = false;
1616 : :
499 1617 [ + + ]: 3684 : if (table_states_validity != SYNC_TABLE_STATE_VALID)
1618 : : {
1619 : : MemoryContext oldctx;
1620 : : List *rstates;
1621 : : ListCell *lc;
1622 : : SubscriptionRelState *rstate;
1623 : :
1624 : 887 : table_states_validity = SYNC_TABLE_STATE_REBUILD_STARTED;
1625 : :
1626 : : /* Clean the old lists. */
1515 1627 : 887 : list_free_deep(table_states_not_ready);
1628 : 887 : table_states_not_ready = NIL;
1629 : :
1630 [ + + ]: 887 : if (!IsTransactionState())
1631 : : {
1632 : 871 : StartTransactionCommand();
1633 : 871 : *started_tx = true;
1634 : : }
1635 : :
1636 : : /* Fetch all non-ready tables. */
1137 michael@paquier.xyz 1637 : 887 : rstates = GetSubscriptionRelations(MySubscription->oid, true);
1638 : :
1639 : : /* Allocate the tracking info in a permanent memory context. */
1515 akapila@postgresql.o 1640 : 887 : oldctx = MemoryContextSwitchTo(CacheMemoryContext);
1641 [ + + + + : 2428 : foreach(lc, rstates)
+ + ]
1642 : : {
1643 : 1541 : rstate = palloc(sizeof(SubscriptionRelState));
1644 : 1541 : memcpy(rstate, lfirst(lc), sizeof(SubscriptionRelState));
1645 : 1541 : table_states_not_ready = lappend(table_states_not_ready, rstate);
1646 : : }
1647 : 887 : MemoryContextSwitchTo(oldctx);
1648 : :
1649 : : /*
1650 : : * Does the subscription have tables?
1651 : : *
1652 : : * If there were not-READY relations found then we know it does. But
1653 : : * if table_states_not_ready was empty we still need to check again to
1654 : : * see if there are 0 tables.
1655 : : */
1116 tgl@sss.pgh.pa.us 1656 [ + + + + ]: 1128 : has_subrels = (table_states_not_ready != NIL) ||
1515 akapila@postgresql.o 1657 : 241 : HasSubscriptionRelations(MySubscription->oid);
1658 : :
1659 : : /*
1660 : : * If the subscription relation cache has been invalidated since we
1661 : : * entered this routine, we still use and return the relations we just
1662 : : * finished constructing, to avoid infinite loops, but we leave the
1663 : : * table states marked as stale so that we'll rebuild it again on next
1664 : : * access. Otherwise, we mark the table states as valid.
1665 : : */
499 1666 [ + - ]: 887 : if (table_states_validity == SYNC_TABLE_STATE_REBUILD_STARTED)
1667 : 887 : table_states_validity = SYNC_TABLE_STATE_VALID;
1668 : : }
1669 : :
1515 1670 : 3684 : return has_subrels;
1671 : : }
1672 : :
1673 : : /*
1674 : : * Execute the initial sync with error handling. Disable the subscription,
1675 : : * if it's required.
1676 : : *
1677 : : * Allocate the slot name in long-lived context on return. Note that we don't
1678 : : * handle FATAL errors which are probably because of system resource error and
1679 : : * are not repeatable.
1680 : : */
1681 : : static void
765 1682 : 195 : start_table_sync(XLogRecPtr *origin_startpos, char **slotname)
1683 : : {
1684 : 195 : char *sync_slotname = NULL;
1685 : :
1686 [ - + ]: 195 : Assert(am_tablesync_worker());
1687 : :
1688 [ + + ]: 195 : PG_TRY();
1689 : : {
1690 : : /* Call initial sync. */
1691 : 195 : sync_slotname = LogicalRepSyncTableStart(origin_startpos);
1692 : : }
1693 : 11 : PG_CATCH();
1694 : : {
1695 [ + + ]: 11 : if (MySubscription->disableonerr)
1696 : 1 : DisableSubscriptionAndExit();
1697 : : else
1698 : : {
1699 : : /*
1700 : : * Report the worker failed during table synchronization. Abort
1701 : : * the current transaction so that the stats message is sent in an
1702 : : * idle state.
1703 : : */
1704 : 10 : AbortOutOfAnyTransaction();
1705 : 10 : pgstat_report_subscription_error(MySubscription->oid, false);
1706 : :
1707 : 10 : PG_RE_THROW();
1708 : : }
1709 : : }
1710 [ - + ]: 183 : PG_END_TRY();
1711 : :
1712 : : /* allocate slot name in long-lived context */
1713 : 183 : *slotname = MemoryContextStrdup(ApplyContext, sync_slotname);
1714 : 183 : pfree(sync_slotname);
1715 : 183 : }
1716 : :
1717 : : /*
1718 : : * Runs the tablesync worker.
1719 : : *
1720 : : * It starts syncing tables. After a successful sync, sets streaming options
1721 : : * and starts streaming to catchup with apply worker.
1722 : : */
1723 : : static void
1724 : 195 : run_tablesync_worker()
1725 : : {
1726 : : char originname[NAMEDATALEN];
1727 : 195 : XLogRecPtr origin_startpos = InvalidXLogRecPtr;
1728 : 195 : char *slotname = NULL;
1729 : : WalRcvStreamOptions options;
1730 : :
1731 : 195 : start_table_sync(&origin_startpos, &slotname);
1732 : :
1733 : 183 : ReplicationOriginNameForLogicalRep(MySubscription->oid,
1734 : 183 : MyLogicalRepWorker->relid,
1735 : : originname,
1736 : : sizeof(originname));
1737 : :
1738 : 183 : set_apply_error_context_origin(originname);
1739 : :
1740 : 183 : set_stream_options(&options, slotname, &origin_startpos);
1741 : :
1742 : 183 : walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
1743 : :
1744 : : /* Apply the changes till we catchup with the apply worker. */
1745 : 183 : start_apply(origin_startpos);
765 akapila@postgresql.o 1746 :UBC 0 : }
1747 : :
1748 : : /* Logical Replication Tablesync worker entry point */
1749 : : void
765 akapila@postgresql.o 1750 :CBC 198 : TablesyncWorkerMain(Datum main_arg)
1751 : : {
1752 : 198 : int worker_slot = DatumGetInt32(main_arg);
1753 : :
1754 : 198 : SetupApplyOrSyncWorker(worker_slot);
1755 : :
1756 : 195 : run_tablesync_worker();
1757 : :
765 akapila@postgresql.o 1758 :UBC 0 : finish_sync_worker();
1759 : : }
1760 : :
1761 : : /*
1762 : : * If the subscription has no tables then return false.
1763 : : *
1764 : : * Otherwise, are all tablesyncs READY?
1765 : : *
1766 : : * Note: This function is not suitable to be called from outside of apply or
1767 : : * tablesync workers because MySubscription needs to be already initialized.
1768 : : */
1769 : : bool
1515 akapila@postgresql.o 1770 :CBC 125 : AllTablesyncsReady(void)
1771 : : {
1772 : 125 : bool started_tx = false;
1773 : 125 : bool has_subrels = false;
1774 : :
1775 : : /* We need up-to-date sync state info for subscription tables here. */
1776 : 125 : has_subrels = FetchTableStates(&started_tx);
1777 : :
1778 [ + + ]: 125 : if (started_tx)
1779 : : {
1780 : 18 : CommitTransactionCommand();
1249 andres@anarazel.de 1781 : 18 : pgstat_report_stat(true);
1782 : : }
1783 : :
1784 : : /*
1785 : : * Return false when there are no tables in subscription or not all tables
1786 : : * are in ready state; true otherwise.
1787 : : */
1116 tgl@sss.pgh.pa.us 1788 [ + - + + ]: 125 : return has_subrels && (table_states_not_ready == NIL);
1789 : : }
1790 : :
1791 : : /*
1792 : : * Update the two_phase state of the specified subscription in pg_subscription.
1793 : : */
1794 : : void
1515 akapila@postgresql.o 1795 : 10 : UpdateTwoPhaseState(Oid suboid, char new_state)
1796 : : {
1797 : : Relation rel;
1798 : : HeapTuple tup;
1799 : : bool nulls[Natts_pg_subscription];
1800 : : bool replaces[Natts_pg_subscription];
1801 : : Datum values[Natts_pg_subscription];
1802 : :
1803 [ + - + - : 10 : Assert(new_state == LOGICALREP_TWOPHASE_STATE_DISABLED ||
- + ]
1804 : : new_state == LOGICALREP_TWOPHASE_STATE_PENDING ||
1805 : : new_state == LOGICALREP_TWOPHASE_STATE_ENABLED);
1806 : :
1807 : 10 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
1808 : 10 : tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(suboid));
1809 [ - + ]: 10 : if (!HeapTupleIsValid(tup))
1515 akapila@postgresql.o 1810 [ # # ]:UBC 0 : elog(ERROR,
1811 : : "cache lookup failed for subscription oid %u",
1812 : : suboid);
1813 : :
1814 : : /* Form a new tuple. */
1515 akapila@postgresql.o 1815 :CBC 10 : memset(values, 0, sizeof(values));
1816 : 10 : memset(nulls, false, sizeof(nulls));
1817 : 10 : memset(replaces, false, sizeof(replaces));
1818 : :
1819 : : /* And update/set two_phase state */
1820 : 10 : values[Anum_pg_subscription_subtwophasestate - 1] = CharGetDatum(new_state);
1821 : 10 : replaces[Anum_pg_subscription_subtwophasestate - 1] = true;
1822 : :
1823 : 10 : tup = heap_modify_tuple(tup, RelationGetDescr(rel),
1824 : : values, nulls, replaces);
1825 : 10 : CatalogTupleUpdate(rel, &tup->t_self, tup);
1826 : :
1827 : 10 : heap_freetuple(tup);
1828 : 10 : table_close(rel, RowExclusiveLock);
1829 : 10 : }
|