Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : * tablesync.c
3 : : * PostgreSQL logical replication: initial table data synchronization
4 : : *
5 : : * Copyright (c) 2012-2025, PostgreSQL Global Development Group
6 : : *
7 : : * IDENTIFICATION
8 : : * src/backend/replication/logical/tablesync.c
9 : : *
10 : : * NOTES
11 : : * This file contains code for initial table data synchronization for
12 : : * logical replication.
13 : : *
14 : : * The initial data synchronization is done separately for each table,
15 : : * in a separate apply worker that only fetches the initial snapshot data
16 : : * from the publisher and then synchronizes the position in the stream with
17 : : * the leader apply worker.
18 : : *
19 : : * There are several reasons for doing the synchronization this way:
20 : : * - It allows us to parallelize the initial data synchronization
21 : : * which lowers the time needed for it to happen.
22 : : * - The initial synchronization does not have to hold the xid and LSN
23 : : * for the time it takes to copy data of all tables, causing less
24 : : * bloat and lower disk consumption compared to doing the
25 : : * synchronization in a single process for the whole database.
26 : : * - It allows us to synchronize any tables added after the initial
27 : : * synchronization has finished.
28 : : *
29 : : * The stream position synchronization works in multiple steps:
30 : : * - Apply worker requests a tablesync worker to start, setting the new
31 : : * table state to INIT.
32 : : * - Tablesync worker starts; changes table state from INIT to DATASYNC while
33 : : * copying.
34 : : * - Tablesync worker does initial table copy; there is a FINISHEDCOPY (sync
35 : : * worker specific) state to indicate when the copy phase has completed, so
36 : : * if the worker crashes with this (non-memory) state then the copy will not
37 : : * be re-attempted.
38 : : * - Tablesync worker then sets table state to SYNCWAIT; waits for state change.
39 : : * - Apply worker periodically checks for tables in SYNCWAIT state. When
40 : : * any appear, it sets the table state to CATCHUP and starts loop-waiting
41 : : * until either the table state is set to SYNCDONE or the sync worker
42 : : * exits.
43 : : * - After the sync worker has seen the state change to CATCHUP, it will
44 : : * read the stream and apply changes (acting like an apply worker) until
45 : : * it catches up to the specified stream position. Then it sets the
46 : : * state to SYNCDONE. There might be zero changes applied between
47 : : * CATCHUP and SYNCDONE, because the sync worker might be ahead of the
48 : : * apply worker.
49 : : * - Once the state is set to SYNCDONE, the apply will continue tracking
50 : : * the table until it reaches the SYNCDONE stream position, at which
51 : : * point it sets state to READY and stops tracking. Again, there might
52 : : * be zero changes in between.
53 : : *
54 : : * So the state progression is always: INIT -> DATASYNC -> FINISHEDCOPY
55 : : * -> SYNCWAIT -> CATCHUP -> SYNCDONE -> READY.
56 : : *
57 : : * The catalog pg_subscription_rel is used to keep information about
58 : : * subscribed tables and their state. The catalog holds all states
59 : : * except SYNCWAIT and CATCHUP which are only in shared memory.
60 : : *
61 : : * Example flows look like this:
62 : : * - Apply is in front:
63 : : * sync:8
64 : : * -> set in catalog FINISHEDCOPY
65 : : * -> set in memory SYNCWAIT
66 : : * apply:10
67 : : * -> set in memory CATCHUP
68 : : * -> enter wait-loop
69 : : * sync:10
70 : : * -> set in catalog SYNCDONE
71 : : * -> exit
72 : : * apply:10
73 : : * -> exit wait-loop
74 : : * -> continue rep
75 : : * apply:11
76 : : * -> set in catalog READY
77 : : *
78 : : * - Sync is in front:
79 : : * sync:10
80 : : * -> set in catalog FINISHEDCOPY
81 : : * -> set in memory SYNCWAIT
82 : : * apply:8
83 : : * -> set in memory CATCHUP
84 : : * -> continue per-table filtering
85 : : * sync:10
86 : : * -> set in catalog SYNCDONE
87 : : * -> exit
88 : : * apply:10
89 : : * -> set in catalog READY
90 : : * -> stop per-table filtering
91 : : * -> continue rep
92 : : *-------------------------------------------------------------------------
93 : : */
94 : :
95 : : #include "postgres.h"
96 : :
97 : : #include "access/table.h"
98 : : #include "access/xact.h"
99 : : #include "catalog/indexing.h"
100 : : #include "catalog/pg_subscription_rel.h"
101 : : #include "catalog/pg_type.h"
102 : : #include "commands/copy.h"
103 : : #include "miscadmin.h"
104 : : #include "nodes/makefuncs.h"
105 : : #include "parser/parse_relation.h"
106 : : #include "pgstat.h"
107 : : #include "replication/logicallauncher.h"
108 : : #include "replication/logicalrelation.h"
109 : : #include "replication/logicalworker.h"
110 : : #include "replication/origin.h"
111 : : #include "replication/slot.h"
112 : : #include "replication/walreceiver.h"
113 : : #include "replication/worker_internal.h"
114 : : #include "storage/ipc.h"
115 : : #include "storage/lmgr.h"
116 : : #include "utils/acl.h"
117 : : #include "utils/array.h"
118 : : #include "utils/builtins.h"
119 : : #include "utils/lsyscache.h"
120 : : #include "utils/rls.h"
121 : : #include "utils/snapmgr.h"
122 : : #include "utils/syscache.h"
123 : : #include "utils/usercontext.h"
124 : :
125 : : List *table_states_not_ready = NIL;
126 : :
127 : : static StringInfo copybuf = NULL;
128 : :
129 : : /*
130 : : * Wait until the relation sync state is set in the catalog to the expected
131 : : * one; return true when it happens.
132 : : *
133 : : * Returns false if the table sync worker or the table itself have
134 : : * disappeared, or the table state has been reset.
135 : : *
136 : : * Currently, this is used in the apply worker when transitioning from
137 : : * CATCHUP state to SYNCDONE.
138 : : */
139 : : static bool
12 akapila@postgresql.o 140 :GNC 182 : wait_for_table_state_change(Oid relid, char expected_state)
141 : : {
142 : : char state;
143 : :
144 : : for (;;)
3141 peter_e@gmx.net 145 :CBC 204 : {
146 : : LogicalRepWorker *worker;
147 : : XLogRecPtr statelsn;
148 : :
3070 149 [ - + ]: 386 : CHECK_FOR_INTERRUPTS();
150 : :
1839 alvherre@alvh.no-ip. 151 : 386 : InvalidateCatalogSnapshot();
3066 peter_e@gmx.net 152 : 386 : state = GetSubscriptionRelState(MyLogicalRepWorker->subid,
153 : : relid, &statelsn);
154 : :
155 [ - + ]: 386 : if (state == SUBREL_STATE_UNKNOWN)
1839 alvherre@alvh.no-ip. 156 :UBC 0 : break;
157 : :
3066 peter_e@gmx.net 158 [ + + ]:CBC 386 : if (state == expected_state)
159 : 182 : return true;
160 : :
161 : : /* Check if the sync worker is still running and bail if not. */
3141 162 : 204 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
1839 alvherre@alvh.no-ip. 163 : 204 : worker = logicalrep_worker_find(MyLogicalRepWorker->subid, relid,
164 : : false);
3066 peter_e@gmx.net 165 : 204 : LWLockRelease(LogicalRepWorkerLock);
3141 166 [ - + ]: 204 : if (!worker)
1839 alvherre@alvh.no-ip. 167 :LBC (1) : break;
168 : :
2531 tmunro@postgresql.or 169 :CBC 204 : (void) WaitLatch(MyLatch,
170 : : WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
171 : : 1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
172 : :
3066 andres@anarazel.de 173 : 204 : ResetLatch(MyLatch);
174 : : }
175 : :
3066 peter_e@gmx.net 176 :LBC (1) : return false;
177 : : }
178 : :
179 : : /*
180 : : * Wait until the apply worker changes the state of our synchronization
181 : : * worker to the expected one.
182 : : *
183 : : * Used when transitioning from SYNCWAIT state to CATCHUP.
184 : : *
185 : : * Returns false if the apply worker has disappeared.
186 : : */
187 : : static bool
3066 peter_e@gmx.net 188 :CBC 184 : wait_for_worker_state_change(char expected_state)
189 : : {
190 : : int rc;
191 : :
192 : : for (;;)
193 : 184 : {
194 : : LogicalRepWorker *worker;
195 : :
196 [ - + ]: 368 : CHECK_FOR_INTERRUPTS();
197 : :
198 : : /*
199 : : * Done if already in correct state. (We assume this fetch is atomic
200 : : * enough to not give a misleading answer if we do it with no lock.)
201 : : */
3042 tgl@sss.pgh.pa.us 202 [ + + ]: 368 : if (MyLogicalRepWorker->relstate == expected_state)
203 : 184 : return true;
204 : :
205 : : /*
206 : : * Bail out if the apply worker has died, else signal it we're
207 : : * waiting.
208 : : */
3066 peter_e@gmx.net 209 : 184 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
210 : 184 : worker = logicalrep_worker_find(MyLogicalRepWorker->subid,
211 : : InvalidOid, false);
3042 tgl@sss.pgh.pa.us 212 [ + - + - ]: 184 : if (worker && worker->proc)
213 : 184 : logicalrep_worker_wakeup_ptr(worker);
3066 peter_e@gmx.net 214 : 184 : LWLockRelease(LogicalRepWorkerLock);
215 [ - + ]: 184 : if (!worker)
3042 tgl@sss.pgh.pa.us 216 :UBC 0 : break;
217 : :
218 : : /*
219 : : * Wait. We expect to get a latch signal back from the apply worker,
220 : : * but use a timeout in case it dies without sending one.
221 : : */
3066 andres@anarazel.de 222 :CBC 184 : rc = WaitLatch(MyLatch,
223 : : WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
224 : : 1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
225 : :
3042 tgl@sss.pgh.pa.us 226 [ + - ]: 184 : if (rc & WL_LATCH_SET)
227 : 184 : ResetLatch(MyLatch);
228 : : }
229 : :
3141 peter_e@gmx.net 230 :UBC 0 : return false;
231 : : }
232 : :
233 : : /*
234 : : * Handle table synchronization cooperation from the synchronization
235 : : * worker.
236 : : *
237 : : * If the sync worker is in CATCHUP state and reached (or passed) the
238 : : * predetermined synchronization point in the WAL stream, mark the table as
239 : : * SYNCDONE and finish.
240 : : */
241 : : void
12 akapila@postgresql.o 242 :GNC 197 : ProcessSyncingTablesForSync(XLogRecPtr current_lsn)
243 : : {
3141 peter_e@gmx.net 244 [ - + ]:CBC 197 : SpinLockAcquire(&MyLogicalRepWorker->relmutex);
245 : :
246 [ + - ]: 197 : if (MyLogicalRepWorker->relstate == SUBREL_STATE_CATCHUP &&
247 [ + + ]: 197 : current_lsn >= MyLogicalRepWorker->relstate_lsn)
248 : : {
249 : : TimeLineID tli;
1719 akapila@postgresql.o 250 : 184 : char syncslotname[NAMEDATALEN] = {0};
1155 251 : 184 : char originname[NAMEDATALEN] = {0};
252 : :
3066 peter_e@gmx.net 253 : 184 : MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCDONE;
3141 254 : 184 : MyLogicalRepWorker->relstate_lsn = current_lsn;
255 : :
256 : 184 : SpinLockRelease(&MyLogicalRepWorker->relmutex);
257 : :
258 : : /*
259 : : * UpdateSubscriptionRelState must be called within a transaction.
260 : : */
1719 akapila@postgresql.o 261 [ + - ]: 184 : if (!IsTransactionState())
262 : 184 : StartTransactionCommand();
263 : :
2762 peter_e@gmx.net 264 : 184 : UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
265 : 184 : MyLogicalRepWorker->relid,
266 : 184 : MyLogicalRepWorker->relstate,
88 akapila@postgresql.o 267 : 184 : MyLogicalRepWorker->relstate_lsn,
268 : : false);
269 : :
270 : : /*
271 : : * End streaming so that LogRepWorkerWalRcvConn can be used to drop
272 : : * the slot.
273 : : */
1630 alvherre@alvh.no-ip. 274 : 184 : walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli);
275 : :
276 : : /*
277 : : * Cleanup the tablesync slot.
278 : : *
279 : : * This has to be done after updating the state because otherwise if
280 : : * there is an error while doing the database operations we won't be
281 : : * able to rollback dropped slot.
282 : : */
1719 akapila@postgresql.o 283 : 184 : ReplicationSlotNameForTablesync(MyLogicalRepWorker->subid,
284 : 184 : MyLogicalRepWorker->relid,
285 : : syncslotname,
286 : : sizeof(syncslotname));
287 : :
288 : : /*
289 : : * It is important to give an error if we are unable to drop the slot,
290 : : * otherwise, it won't be dropped till the corresponding subscription
291 : : * is dropped. So passing missing_ok = false.
292 : : */
1630 alvherre@alvh.no-ip. 293 : 184 : ReplicationSlotDropAtPubNode(LogRepWorkerWalRcvConn, syncslotname, false);
294 : :
1142 akapila@postgresql.o 295 : 184 : CommitTransactionCommand();
296 : 184 : pgstat_report_stat(false);
297 : :
298 : : /*
299 : : * Start a new transaction to clean up the tablesync origin tracking.
300 : : * This transaction will be ended within the FinishSyncWorker(). Now,
301 : : * even, if we fail to remove this here, the apply worker will ensure
302 : : * to clean it up afterward.
303 : : *
304 : : * We need to do this after the table state is set to SYNCDONE.
305 : : * Otherwise, if an error occurs while performing the database
306 : : * operation, the worker will be restarted and the in-memory state of
307 : : * replication progress (remote_lsn) won't be rolled-back which would
308 : : * have been cleared before restart. So, the restarted worker will use
309 : : * invalid replication progress state resulting in replay of
310 : : * transactions that have already been applied.
311 : : */
312 : 184 : StartTransactionCommand();
313 : :
1113 314 : 184 : ReplicationOriginNameForLogicalRep(MyLogicalRepWorker->subid,
315 : 184 : MyLogicalRepWorker->relid,
316 : : originname,
317 : : sizeof(originname));
318 : :
319 : : /*
320 : : * Resetting the origin session removes the ownership of the slot.
321 : : * This is needed to allow the origin to be dropped.
322 : : */
1142 323 : 184 : replorigin_session_reset();
324 : 184 : replorigin_session_origin = InvalidRepOriginId;
325 : 184 : replorigin_session_origin_lsn = InvalidXLogRecPtr;
326 : 184 : replorigin_session_origin_timestamp = 0;
327 : :
328 : : /*
329 : : * Drop the tablesync's origin tracking if exists.
330 : : *
331 : : * There is a chance that the user is concurrently performing refresh
332 : : * for the subscription where we remove the table state and its origin
333 : : * or the apply worker would have removed this origin. So passing
334 : : * missing_ok = true.
335 : : */
336 : 184 : replorigin_drop_by_name(originname, true, false);
337 : :
12 akapila@postgresql.o 338 :GNC 184 : FinishSyncWorker();
339 : : }
340 : : else
3141 peter_e@gmx.net 341 :CBC 13 : SpinLockRelease(&MyLogicalRepWorker->relmutex);
342 : 13 : }
343 : :
344 : : /*
345 : : * Handle table synchronization cooperation from the apply worker.
346 : : *
347 : : * Walk over all subscription tables that are individually tracked by the
348 : : * apply process (currently, all that have state other than
349 : : * SUBREL_STATE_READY) and manage synchronization for them.
350 : : *
351 : : * If there are tables that need synchronizing and are not being synchronized
352 : : * yet, start sync workers for them (if there are free slots for sync
353 : : * workers). To prevent starting the sync worker for the same relation at a
354 : : * high frequency after a failure, we store its last start time with each sync
355 : : * state info. We start the sync worker for the same relation after waiting
356 : : * at least wal_retrieve_retry_interval.
357 : : *
358 : : * For tables that are being synchronized already, check if sync workers
359 : : * either need action from the apply worker or have finished. This is the
360 : : * SYNCWAIT to CATCHUP transition.
361 : : *
362 : : * If the synchronization position is reached (SYNCDONE), then the table can
363 : : * be marked as READY and is no longer tracked.
364 : : */
365 : : void
12 akapila@postgresql.o 366 :GNC 3032 : ProcessSyncingTablesForApply(XLogRecPtr current_lsn)
367 : : {
368 : : struct tablesync_start_time_mapping
369 : : {
370 : : Oid relid;
371 : : TimestampTz last_start_time;
372 : : };
373 : : static HTAB *last_start_times = NULL;
374 : : ListCell *lc;
3095 peter_e@gmx.net 375 :CBC 3032 : bool started_tx = false;
1026 tgl@sss.pgh.pa.us 376 : 3032 : bool should_exit = false;
88 akapila@postgresql.o 377 : 3032 : Relation rel = NULL;
378 : :
3141 peter_e@gmx.net 379 [ - + ]: 3032 : Assert(!IsTransactionState());
380 : :
381 : : /* We need up-to-date sync state info for subscription tables here. */
12 akapila@postgresql.o 382 :GNC 3032 : FetchRelationStates(&started_tx);
383 : :
384 : : /*
385 : : * Prepare a hash table for tracking last start times of workers, to avoid
386 : : * immediate restarts. We don't need it if there are no tables that need
387 : : * syncing.
388 : : */
1168 tgl@sss.pgh.pa.us 389 [ + + + + ]:CBC 3032 : if (table_states_not_ready != NIL && !last_start_times)
3106 peter_e@gmx.net 390 : 121 : {
391 : : HASHCTL ctl;
392 : :
393 : 121 : ctl.keysize = sizeof(Oid);
394 : 121 : ctl.entrysize = sizeof(struct tablesync_start_time_mapping);
395 : 121 : last_start_times = hash_create("Logical replication table sync worker start times",
396 : : 256, &ctl, HASH_ELEM | HASH_BLOBS);
397 : : }
398 : :
399 : : /*
400 : : * Clean up the hash table when we're done with all tables (just to
401 : : * release the bit of memory).
402 : : */
1168 tgl@sss.pgh.pa.us 403 [ + + + + ]: 2911 : else if (table_states_not_ready == NIL && last_start_times)
404 : : {
3106 peter_e@gmx.net 405 : 90 : hash_destroy(last_start_times);
406 : 90 : last_start_times = NULL;
407 : : }
408 : :
409 : : /*
410 : : * Process all tables that are being synchronized.
411 : : */
1567 akapila@postgresql.o 412 [ + + + + : 4863 : foreach(lc, table_states_not_ready)
+ + ]
413 : : {
3086 bruce@momjian.us 414 : 1833 : SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
415 : :
3141 peter_e@gmx.net 416 [ + + ]: 1833 : if (rstate->state == SUBREL_STATE_SYNCDONE)
417 : : {
418 : : /*
419 : : * Apply has caught up to the position where the table sync has
420 : : * finished. Mark the table as ready so that the apply will just
421 : : * continue to replicate it normally.
422 : : */
423 [ + + ]: 181 : if (current_lsn >= rstate->lsn)
424 : : {
425 : : char originname[NAMEDATALEN];
426 : :
427 : 180 : rstate->state = SUBREL_STATE_READY;
428 : 180 : rstate->lsn = current_lsn;
3095 429 [ + + ]: 180 : if (!started_tx)
430 : : {
431 : 9 : StartTransactionCommand();
432 : 9 : started_tx = true;
433 : : }
434 : :
435 : : /*
436 : : * Remove the tablesync origin tracking if exists.
437 : : *
438 : : * There is a chance that the user is concurrently performing
439 : : * refresh for the subscription where we remove the table
440 : : * state and its origin or the tablesync worker would have
441 : : * already removed this origin. We can't rely on tablesync
442 : : * worker to remove the origin tracking as if there is any
443 : : * error while dropping we won't restart it to drop the
444 : : * origin. So passing missing_ok = true.
445 : : *
446 : : * Lock the subscription and origin in the same order as we
447 : : * are doing during DDL commands to avoid deadlocks. See
448 : : * AlterSubscription_refresh.
449 : : */
88 akapila@postgresql.o 450 : 180 : LockSharedObject(SubscriptionRelationId, MyLogicalRepWorker->subid,
451 : : 0, AccessShareLock);
452 : :
453 [ + - ]: 180 : if (!rel)
454 : 180 : rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
455 : :
1113 456 : 180 : ReplicationOriginNameForLogicalRep(MyLogicalRepWorker->subid,
457 : : rstate->relid,
458 : : originname,
459 : : sizeof(originname));
1142 460 : 180 : replorigin_drop_by_name(originname, true, false);
461 : :
462 : : /*
463 : : * Update the state to READY only after the origin cleanup.
464 : : */
2762 peter_e@gmx.net 465 : 180 : UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
466 : 180 : rstate->relid, rstate->state,
467 : : rstate->lsn, true);
468 : : }
469 : : }
470 : : else
471 : : {
472 : : LogicalRepWorker *syncworker;
473 : :
474 : : /*
475 : : * Look for a sync worker for this relation.
476 : : */
3141 477 : 1652 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
478 : :
479 : 1652 : syncworker = logicalrep_worker_find(MyLogicalRepWorker->subid,
480 : : rstate->relid, false);
481 : :
482 [ + + ]: 1652 : if (syncworker)
483 : : {
484 : : /* Found one, update our copy of its state */
485 [ - + ]: 747 : SpinLockAcquire(&syncworker->relmutex);
486 : 747 : rstate->state = syncworker->relstate;
487 : 747 : rstate->lsn = syncworker->relstate_lsn;
3042 tgl@sss.pgh.pa.us 488 [ + + ]: 747 : if (rstate->state == SUBREL_STATE_SYNCWAIT)
489 : : {
490 : : /*
491 : : * Sync worker is waiting for apply. Tell sync worker it
492 : : * can catchup now.
493 : : */
494 : 182 : syncworker->relstate = SUBREL_STATE_CATCHUP;
495 : 182 : syncworker->relstate_lsn =
496 : 182 : Max(syncworker->relstate_lsn, current_lsn);
497 : : }
3141 peter_e@gmx.net 498 : 747 : SpinLockRelease(&syncworker->relmutex);
499 : :
500 : : /* If we told worker to catch up, wait for it. */
3042 tgl@sss.pgh.pa.us 501 [ + + ]: 747 : if (rstate->state == SUBREL_STATE_SYNCWAIT)
502 : : {
503 : : /* Signal the sync worker, as it may be waiting for us. */
504 [ + - ]: 182 : if (syncworker->proc)
505 : 182 : logicalrep_worker_wakeup_ptr(syncworker);
506 : :
507 : : /* Now safe to release the LWLock */
508 : 182 : LWLockRelease(LogicalRepWorkerLock);
509 : :
687 akapila@postgresql.o 510 [ + - ]: 182 : if (started_tx)
511 : : {
512 : : /*
513 : : * We must commit the existing transaction to release
514 : : * the existing locks before entering a busy loop.
515 : : * This is required to avoid any undetected deadlocks
516 : : * due to any existing lock as deadlock detector won't
517 : : * be able to detect the waits on the latch.
518 : : *
519 : : * Also close any tables prior to the commit.
520 : : */
88 521 [ + + ]: 182 : if (rel)
522 : : {
523 : 31 : table_close(rel, NoLock);
524 : 31 : rel = NULL;
525 : : }
687 526 : 182 : CommitTransactionCommand();
527 : 182 : pgstat_report_stat(false);
528 : : }
529 : :
530 : : /*
531 : : * Enter busy loop and wait for synchronization worker to
532 : : * reach expected state (or die trying).
533 : : */
534 : 182 : StartTransactionCommand();
535 : 182 : started_tx = true;
536 : :
12 akapila@postgresql.o 537 :GNC 182 : wait_for_table_state_change(rstate->relid,
538 : : SUBREL_STATE_SYNCDONE);
539 : : }
540 : : else
3042 tgl@sss.pgh.pa.us 541 :CBC 565 : LWLockRelease(LogicalRepWorkerLock);
542 : : }
543 : : else
544 : : {
545 : : /*
546 : : * If there is no sync worker for this table yet, count
547 : : * running sync workers for this subscription, while we have
548 : : * the lock.
549 : : */
550 : : int nsyncworkers =
893 551 : 905 : logicalrep_sync_worker_count(MyLogicalRepWorker->subid);
552 : :
553 : : /* Now safe to release the LWLock */
3042 554 : 905 : LWLockRelease(LogicalRepWorkerLock);
555 : :
556 : : /*
557 : : * If there are free sync worker slot(s), start a new sync
558 : : * worker for the table.
559 : : */
560 [ + + ]: 905 : if (nsyncworkers < max_sync_workers_per_subscription)
561 : : {
562 : 208 : TimestampTz now = GetCurrentTimestamp();
563 : : struct tablesync_start_time_mapping *hentry;
564 : : bool found;
565 : :
566 : 208 : hentry = hash_search(last_start_times, &rstate->relid,
567 : : HASH_ENTER, &found);
568 : :
569 [ + + + + ]: 226 : if (!found ||
570 : 18 : TimestampDifferenceExceeds(hentry->last_start_time, now,
571 : : wal_retrieve_retry_interval))
572 : : {
573 : : /*
574 : : * Set the last_start_time even if we fail to start
575 : : * the worker, so that we won't retry until
576 : : * wal_retrieve_retry_interval has elapsed.
577 : : */
578 : 196 : hentry->last_start_time = now;
126 579 : 196 : (void) logicalrep_worker_launch(WORKERTYPE_TABLESYNC,
580 : 196 : MyLogicalRepWorker->dbid,
581 : 196 : MySubscription->oid,
582 : 196 : MySubscription->name,
583 : 196 : MyLogicalRepWorker->userid,
584 : : rstate->relid,
585 : : DSM_HANDLE_INVALID,
586 : : false);
587 : : }
588 : : }
589 : : }
590 : : }
591 : : }
592 : :
593 : : /* Close table if opened */
88 akapila@postgresql.o 594 [ + + ]: 3030 : if (rel)
595 : 149 : table_close(rel, NoLock);
596 : :
597 : :
3095 peter_e@gmx.net 598 [ + + ]: 3030 : if (started_tx)
599 : : {
600 : : /*
601 : : * Even when the two_phase mode is requested by the user, it remains
602 : : * as 'pending' until all tablesyncs have reached READY state.
603 : : *
604 : : * When this happens, we restart the apply worker and (if the
605 : : * conditions are still ok) then the two_phase tri-state will become
606 : : * 'enabled' at that time.
607 : : *
608 : : * Note: If the subscription has no tables then leave the state as
609 : : * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
610 : : * work.
611 : : */
1026 tgl@sss.pgh.pa.us 612 [ + + ]: 866 : if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING)
613 : : {
614 : 25 : CommandCounterIncrement(); /* make updates visible */
615 [ + + ]: 25 : if (AllTablesyncsReady())
616 : : {
617 [ + - ]: 6 : ereport(LOG,
618 : : (errmsg("logical replication apply worker for subscription \"%s\" will restart so that two_phase can be enabled",
619 : : MySubscription->name)));
620 : 6 : should_exit = true;
621 : : }
622 : : }
623 : :
3095 peter_e@gmx.net 624 : 866 : CommitTransactionCommand();
1301 andres@anarazel.de 625 : 866 : pgstat_report_stat(true);
626 : : }
627 : :
1026 tgl@sss.pgh.pa.us 628 [ + + ]: 3030 : if (should_exit)
629 : : {
630 : : /*
631 : : * Reset the last-start time for this worker so that the launcher will
632 : : * restart it without waiting for wal_retrieve_retry_interval.
633 : : */
1010 634 : 6 : ApplyLauncherForgetWorkerStartTime(MySubscription->oid);
635 : :
1026 636 : 6 : proc_exit(0);
637 : : }
3141 peter_e@gmx.net 638 : 3024 : }
639 : :
640 : : /*
641 : : * Create list of columns for COPY based on logical relation mapping.
642 : : */
643 : : static List *
644 : 192 : make_copy_attnamelist(LogicalRepRelMapEntry *rel)
645 : : {
646 : 192 : List *attnamelist = NIL;
647 : : int i;
648 : :
3085 649 [ + + ]: 516 : for (i = 0; i < rel->remoterel.natts; i++)
650 : : {
3141 651 : 324 : attnamelist = lappend(attnamelist,
3085 652 : 324 : makeString(rel->remoterel.attnames[i]));
653 : : }
654 : :
655 : :
3141 656 : 192 : return attnamelist;
657 : : }
658 : :
659 : : /*
660 : : * Data source callback for the COPY FROM, which reads from the remote
661 : : * connection and passes the data back to our local COPY.
662 : : */
663 : : static int
664 : 13985 : copy_read_data(void *outbuf, int minread, int maxread)
665 : : {
3086 bruce@momjian.us 666 : 13985 : int bytesread = 0;
667 : : int avail;
668 : :
669 : : /* If there are some leftover data from previous read, use it. */
3141 peter_e@gmx.net 670 : 13985 : avail = copybuf->len - copybuf->cursor;
671 [ - + ]: 13985 : if (avail)
672 : : {
3141 peter_e@gmx.net 673 [ # # ]:UBC 0 : if (avail > maxread)
674 : 0 : avail = maxread;
675 : 0 : memcpy(outbuf, ©buf->data[copybuf->cursor], avail);
676 : 0 : copybuf->cursor += avail;
677 : 0 : maxread -= avail;
678 : 0 : bytesread += avail;
679 : : }
680 : :
3070 peter_e@gmx.net 681 [ + - + - ]:CBC 13985 : while (maxread > 0 && bytesread < minread)
682 : : {
3141 683 : 13985 : pgsocket fd = PGINVALID_SOCKET;
684 : : int len;
685 : 13985 : char *buf = NULL;
686 : :
687 : : for (;;)
688 : : {
689 : : /* Try read the data. */
1630 alvherre@alvh.no-ip. 690 : 13985 : len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
691 : :
3141 peter_e@gmx.net 692 [ - + ]: 13985 : CHECK_FOR_INTERRUPTS();
693 : :
694 [ - + ]: 13985 : if (len == 0)
3141 peter_e@gmx.net 695 :UBC 0 : break;
3141 peter_e@gmx.net 696 [ + + ]:CBC 13985 : else if (len < 0)
697 : 13985 : return bytesread;
698 : : else
699 : : {
700 : : /* Process the data */
701 : 13795 : copybuf->data = buf;
702 : 13795 : copybuf->len = len;
703 : 13795 : copybuf->cursor = 0;
704 : :
705 : 13795 : avail = copybuf->len - copybuf->cursor;
706 [ - + ]: 13795 : if (avail > maxread)
3141 peter_e@gmx.net 707 :UBC 0 : avail = maxread;
3141 peter_e@gmx.net 708 :CBC 13795 : memcpy(outbuf, ©buf->data[copybuf->cursor], avail);
334 peter@eisentraut.org 709 : 13795 : outbuf = (char *) outbuf + avail;
3141 peter_e@gmx.net 710 : 13795 : copybuf->cursor += avail;
711 : 13795 : maxread -= avail;
712 : 13795 : bytesread += avail;
713 : : }
714 : :
715 [ + - + - ]: 13795 : if (maxread <= 0 || bytesread >= minread)
716 : 13795 : return bytesread;
717 : : }
718 : :
719 : : /*
720 : : * Wait for more data or latch.
721 : : */
2531 tmunro@postgresql.or 722 :UBC 0 : (void) WaitLatchOrSocket(MyLatch,
723 : : WL_SOCKET_READABLE | WL_LATCH_SET |
724 : : WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
725 : : fd, 1000L, WAIT_EVENT_LOGICAL_SYNC_DATA);
726 : :
3066 andres@anarazel.de 727 : 0 : ResetLatch(MyLatch);
728 : : }
729 : :
3141 peter_e@gmx.net 730 : 0 : return bytesread;
731 : : }
732 : :
733 : :
734 : : /*
735 : : * Get information about remote relation in similar fashion the RELATION
736 : : * message provides during replication.
737 : : *
738 : : * This function also returns (a) the relation qualifications to be used in
739 : : * the COPY command, and (b) whether the remote relation has published any
740 : : * generated column.
741 : : */
742 : : static void
363 akapila@postgresql.o 743 :CBC 195 : fetch_remote_table_info(char *nspname, char *relname, LogicalRepRelation *lrel,
744 : : List **qual, bool *gencol_published)
745 : : {
746 : : WalRcvExecResult *res;
747 : : StringInfoData cmd;
748 : : TupleTableSlot *slot;
2049 peter@eisentraut.org 749 : 195 : Oid tableRow[] = {OIDOID, CHAROID, CHAROID};
363 akapila@postgresql.o 750 : 195 : Oid attrRow[] = {INT2OID, TEXTOID, OIDOID, BOOLOID, BOOLOID};
1344 751 : 195 : Oid qualRow[] = {TEXTOID};
752 : : bool isnull;
753 : : int natt;
368 michael@paquier.xyz 754 : 195 : StringInfo pub_names = NULL;
1312 tomas.vondra@postgre 755 : 195 : Bitmapset *included_cols = NULL;
363 akapila@postgresql.o 756 : 195 : int server_version = walrcv_server_version(LogRepWorkerWalRcvConn);
757 : :
3141 peter_e@gmx.net 758 : 195 : lrel->nspname = nspname;
759 : 195 : lrel->relname = relname;
760 : :
761 : : /* First fetch Oid and replica identity. */
762 : 195 : initStringInfo(&cmd);
2049 peter@eisentraut.org 763 : 195 : appendStringInfo(&cmd, "SELECT c.oid, c.relreplident, c.relkind"
764 : : " FROM pg_catalog.pg_class c"
765 : : " INNER JOIN pg_catalog.pg_namespace n"
766 : : " ON (c.relnamespace = n.oid)"
767 : : " WHERE n.nspname = %s"
768 : : " AND c.relname = %s",
769 : : quote_literal_cstr(nspname),
770 : : quote_literal_cstr(relname));
1630 alvherre@alvh.no-ip. 771 : 195 : res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
772 : : lengthof(tableRow), tableRow);
773 : :
3141 peter_e@gmx.net 774 [ - + ]: 195 : if (res->status != WALRCV_OK_TUPLES)
3141 peter_e@gmx.net 775 [ # # ]:UBC 0 : ereport(ERROR,
776 : : (errcode(ERRCODE_CONNECTION_FAILURE),
777 : : errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s",
778 : : nspname, relname, res->err)));
779 : :
2539 andres@anarazel.de 780 :CBC 195 : slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
3141 peter_e@gmx.net 781 [ + + ]: 195 : if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot))
3141 peter_e@gmx.net 782 [ + - ]:GBC 1 : ereport(ERROR,
783 : : (errcode(ERRCODE_UNDEFINED_OBJECT),
784 : : errmsg("table \"%s.%s\" not found on publisher",
785 : : nspname, relname)));
786 : :
3141 peter_e@gmx.net 787 :CBC 194 : lrel->remoteid = DatumGetObjectId(slot_getattr(slot, 1, &isnull));
788 [ - + ]: 194 : Assert(!isnull);
789 : 194 : lrel->replident = DatumGetChar(slot_getattr(slot, 2, &isnull));
790 [ - + ]: 194 : Assert(!isnull);
2049 peter@eisentraut.org 791 : 194 : lrel->relkind = DatumGetChar(slot_getattr(slot, 3, &isnull));
792 [ - + ]: 194 : Assert(!isnull);
793 : :
3141 peter_e@gmx.net 794 : 194 : ExecDropSingleTupleTableSlot(slot);
795 : 194 : walrcv_clear_result(res);
796 : :
797 : :
798 : : /*
799 : : * Get column lists for each relation.
800 : : *
801 : : * We need to do this before fetching info about column names and types,
802 : : * so that we can skip columns that should not be replicated.
803 : : */
363 akapila@postgresql.o 804 [ + - ]: 194 : if (server_version >= 150000)
805 : : {
806 : : WalRcvExecResult *pubres;
807 : : TupleTableSlot *tslot;
1244 808 : 194 : Oid attrsRow[] = {INT2VECTOROID};
809 : :
810 : : /* Build the pub_names comma-separated string. */
368 michael@paquier.xyz 811 : 194 : pub_names = makeStringInfo();
812 : 194 : GetPublicationsStr(MySubscription->publications, pub_names, true);
813 : :
814 : : /*
815 : : * Fetch info about column lists for the relation (from all the
816 : : * publications).
817 : : */
1312 tomas.vondra@postgre 818 : 194 : resetStringInfo(&cmd);
819 : 194 : appendStringInfo(&cmd,
820 : : "SELECT DISTINCT"
821 : : " (CASE WHEN (array_length(gpt.attrs, 1) = c.relnatts)"
822 : : " THEN NULL ELSE gpt.attrs END)"
823 : : " FROM pg_publication p,"
824 : : " LATERAL pg_get_publication_tables(p.pubname) gpt,"
825 : : " pg_class c"
826 : : " WHERE gpt.relid = %u AND c.oid = gpt.relid"
827 : : " AND p.pubname IN ( %s )",
828 : : lrel->remoteid,
829 : : pub_names->data);
830 : :
831 : 194 : pubres = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
832 : : lengthof(attrsRow), attrsRow);
833 : :
834 [ - + ]: 194 : if (pubres->status != WALRCV_OK_TUPLES)
1312 tomas.vondra@postgre 835 [ # # ]:UBC 0 : ereport(ERROR,
836 : : (errcode(ERRCODE_CONNECTION_FAILURE),
837 : : errmsg("could not fetch column list info for table \"%s.%s\" from publisher: %s",
838 : : nspname, relname, pubres->err)));
839 : :
840 : : /*
841 : : * We don't support the case where the column list is different for
842 : : * the same table when combining publications. See comments atop
843 : : * fetch_relation_list. So there should be only one row returned.
844 : : * Although we already checked this when creating the subscription, we
845 : : * still need to check here in case the column list was changed after
846 : : * creating the subscription and before the sync worker is started.
847 : : */
1244 akapila@postgresql.o 848 [ - + ]:CBC 194 : if (tuplestore_tuple_count(pubres->tuplestore) > 1)
1244 akapila@postgresql.o 849 [ # # ]:UBC 0 : ereport(ERROR,
850 : : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
851 : : errmsg("cannot use different column lists for table \"%s.%s\" in different publications",
852 : : nspname, relname));
853 : :
854 : : /*
855 : : * Get the column list and build a single bitmap with the attnums.
856 : : *
857 : : * If we find a NULL value, it means all the columns should be
858 : : * replicated.
859 : : */
1165 drowley@postgresql.o 860 :CBC 194 : tslot = MakeSingleTupleTableSlot(pubres->tupledesc, &TTSOpsMinimalTuple);
861 [ + - ]: 194 : if (tuplestore_gettupleslot(pubres->tuplestore, true, false, tslot))
862 : : {
863 : 194 : Datum cfval = slot_getattr(tslot, 1, &isnull);
864 : :
1244 akapila@postgresql.o 865 [ + + ]: 194 : if (!isnull)
866 : : {
867 : : ArrayType *arr;
868 : : int nelems;
869 : : int16 *elems;
870 : :
871 : 22 : arr = DatumGetArrayTypeP(cfval);
872 : 22 : nelems = ARR_DIMS(arr)[0];
873 [ - + ]: 22 : elems = (int16 *) ARR_DATA_PTR(arr);
874 : :
875 [ + + ]: 59 : for (natt = 0; natt < nelems; natt++)
876 : 37 : included_cols = bms_add_member(included_cols, elems[natt]);
877 : : }
878 : :
1165 drowley@postgresql.o 879 : 194 : ExecClearTuple(tslot);
880 : : }
881 : 194 : ExecDropSingleTupleTableSlot(tslot);
882 : :
1312 tomas.vondra@postgre 883 : 194 : walrcv_clear_result(pubres);
884 : : }
885 : :
886 : : /*
887 : : * Now fetch column names and types.
888 : : */
3141 peter_e@gmx.net 889 : 194 : resetStringInfo(&cmd);
200 drowley@postgresql.o 890 : 194 : appendStringInfoString(&cmd,
891 : : "SELECT a.attnum,"
892 : : " a.attname,"
893 : : " a.atttypid,"
894 : : " a.attnum = ANY(i.indkey)");
895 : :
896 : : /* Generated columns can be replicated since version 18. */
363 akapila@postgresql.o 897 [ + - ]: 194 : if (server_version >= 180000)
200 drowley@postgresql.o 898 : 194 : appendStringInfoString(&cmd, ", a.attgenerated != ''");
899 : :
363 akapila@postgresql.o 900 [ + - ]: 388 : appendStringInfo(&cmd,
901 : : " FROM pg_catalog.pg_attribute a"
902 : : " LEFT JOIN pg_catalog.pg_index i"
903 : : " ON (i.indexrelid = pg_get_replica_identity_index(%u))"
904 : : " WHERE a.attnum > 0::pg_catalog.int2"
905 : : " AND NOT a.attisdropped %s"
906 : : " AND a.attrelid = %u"
907 : : " ORDER BY a.attnum",
908 : : lrel->remoteid,
909 [ - + ]: 194 : (server_version >= 120000 && server_version < 180000 ?
910 : : "AND a.attgenerated = ''" : ""),
911 : : lrel->remoteid);
1630 alvherre@alvh.no-ip. 912 [ + - ]: 194 : res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
913 : : server_version >= 180000 ? lengthof(attrRow) : lengthof(attrRow) - 1, attrRow);
914 : :
3141 peter_e@gmx.net 915 [ - + ]: 194 : if (res->status != WALRCV_OK_TUPLES)
3141 peter_e@gmx.net 916 [ # # ]:UBC 0 : ereport(ERROR,
917 : : (errcode(ERRCODE_CONNECTION_FAILURE),
918 : : errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s",
919 : : nspname, relname, res->err)));
920 : :
921 : : /* We don't know the number of rows coming, so allocate enough space. */
3141 peter_e@gmx.net 922 :CBC 194 : lrel->attnames = palloc0(MaxTupleAttributeNumber * sizeof(char *));
923 : 194 : lrel->atttyps = palloc0(MaxTupleAttributeNumber * sizeof(Oid));
924 : 194 : lrel->attkeys = NULL;
925 : :
926 : : /*
927 : : * Store the columns as a list of names. Ignore those that are not
928 : : * present in the column list, if there is one.
929 : : */
930 : 194 : natt = 0;
2539 andres@anarazel.de 931 : 194 : slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
3141 peter_e@gmx.net 932 [ + + ]: 555 : while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
933 : : {
934 : : char *rel_colname;
935 : : AttrNumber attnum;
936 : :
1312 tomas.vondra@postgre 937 : 361 : attnum = DatumGetInt16(slot_getattr(slot, 1, &isnull));
938 [ - + ]: 361 : Assert(!isnull);
939 : :
940 : : /* If the column is not in the column list, skip it. */
941 [ + + + + ]: 361 : if (included_cols != NULL && !bms_is_member(attnum, included_cols))
942 : : {
943 : 31 : ExecClearTuple(slot);
944 : 31 : continue;
945 : : }
946 : :
947 : 330 : rel_colname = TextDatumGetCString(slot_getattr(slot, 2, &isnull));
3141 peter_e@gmx.net 948 [ - + ]: 330 : Assert(!isnull);
949 : :
1312 tomas.vondra@postgre 950 : 330 : lrel->attnames[natt] = rel_colname;
951 : 330 : lrel->atttyps[natt] = DatumGetObjectId(slot_getattr(slot, 3, &isnull));
3141 peter_e@gmx.net 952 [ - + ]: 330 : Assert(!isnull);
953 : :
1312 tomas.vondra@postgre 954 [ + + ]: 330 : if (DatumGetBool(slot_getattr(slot, 4, &isnull)))
3141 peter_e@gmx.net 955 : 108 : lrel->attkeys = bms_add_member(lrel->attkeys, natt);
956 : :
957 : : /* Remember if the remote table has published any generated column. */
363 akapila@postgresql.o 958 [ + - + - ]: 330 : if (server_version >= 180000 && !(*gencol_published))
959 : : {
960 : 330 : *gencol_published = DatumGetBool(slot_getattr(slot, 5, &isnull));
961 [ - + ]: 330 : Assert(!isnull);
962 : : }
963 : :
964 : : /* Should never happen. */
3141 peter_e@gmx.net 965 [ - + ]: 330 : if (++natt >= MaxTupleAttributeNumber)
3141 peter_e@gmx.net 966 [ # # ]:UBC 0 : elog(ERROR, "too many columns in remote table \"%s.%s\"",
967 : : nspname, relname);
968 : :
3141 peter_e@gmx.net 969 :CBC 330 : ExecClearTuple(slot);
970 : : }
971 : 194 : ExecDropSingleTupleTableSlot(slot);
972 : :
973 : 194 : lrel->natts = natt;
974 : :
975 : 194 : walrcv_clear_result(res);
976 : :
977 : : /*
978 : : * Get relation's row filter expressions. DISTINCT avoids the same
979 : : * expression of a table in multiple publications from being included
980 : : * multiple times in the final expression.
981 : : *
982 : : * We need to copy the row even if it matches just one of the
983 : : * publications, so we later combine all the quals with OR.
984 : : *
985 : : * For initial synchronization, row filtering can be ignored in following
986 : : * cases:
987 : : *
988 : : * 1) one of the subscribed publications for the table hasn't specified
989 : : * any row filter
990 : : *
991 : : * 2) one of the subscribed publications has puballtables set to true
992 : : *
993 : : * 3) one of the subscribed publications is declared as TABLES IN SCHEMA
994 : : * that includes this relation
995 : : */
363 akapila@postgresql.o 996 [ + - ]: 194 : if (server_version >= 150000)
997 : : {
998 : : /* Reuse the already-built pub_names. */
368 michael@paquier.xyz 999 [ - + ]: 194 : Assert(pub_names != NULL);
1000 : :
1001 : : /* Check for row filters. */
1344 akapila@postgresql.o 1002 : 194 : resetStringInfo(&cmd);
1003 : 194 : appendStringInfo(&cmd,
1004 : : "SELECT DISTINCT pg_get_expr(gpt.qual, gpt.relid)"
1005 : : " FROM pg_publication p,"
1006 : : " LATERAL pg_get_publication_tables(p.pubname) gpt"
1007 : : " WHERE gpt.relid = %u"
1008 : : " AND p.pubname IN ( %s )",
1009 : : lrel->remoteid,
1010 : : pub_names->data);
1011 : :
1012 : 194 : res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 1, qualRow);
1013 : :
1014 [ - + ]: 194 : if (res->status != WALRCV_OK_TUPLES)
1344 akapila@postgresql.o 1015 [ # # ]:UBC 0 : ereport(ERROR,
1016 : : (errmsg("could not fetch table WHERE clause info for table \"%s.%s\" from publisher: %s",
1017 : : nspname, relname, res->err)));
1018 : :
1019 : : /*
1020 : : * Multiple row filter expressions for the same table will be combined
1021 : : * by COPY using OR. If any of the filter expressions for this table
1022 : : * are null, it means the whole table will be copied. In this case it
1023 : : * is not necessary to construct a unified row filter expression at
1024 : : * all.
1025 : : */
1344 akapila@postgresql.o 1026 :CBC 194 : slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
1027 [ + + ]: 209 : while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
1028 : : {
1029 : 198 : Datum rf = slot_getattr(slot, 1, &isnull);
1030 : :
1031 [ + + ]: 198 : if (!isnull)
1032 : 15 : *qual = lappend(*qual, makeString(TextDatumGetCString(rf)));
1033 : : else
1034 : : {
1035 : : /* Ignore filters and cleanup as necessary. */
1036 [ + + ]: 183 : if (*qual)
1037 : : {
1038 : 3 : list_free_deep(*qual);
1039 : 3 : *qual = NIL;
1040 : : }
1041 : 183 : break;
1042 : : }
1043 : :
1044 : 15 : ExecClearTuple(slot);
1045 : : }
1046 : 194 : ExecDropSingleTupleTableSlot(slot);
1047 : :
1048 : 194 : walrcv_clear_result(res);
368 michael@paquier.xyz 1049 : 194 : destroyStringInfo(pub_names);
1050 : : }
1051 : :
3141 peter_e@gmx.net 1052 : 194 : pfree(cmd.data);
1053 : 194 : }
1054 : :
1055 : : /*
1056 : : * Copy existing data of a table from publisher.
1057 : : *
1058 : : * Caller is responsible for locking the local relation.
1059 : : */
1060 : : static void
1061 : 195 : copy_table(Relation rel)
1062 : : {
1063 : : LogicalRepRelMapEntry *relmapentry;
1064 : : LogicalRepRelation lrel;
1344 akapila@postgresql.o 1065 : 195 : List *qual = NIL;
1066 : : WalRcvExecResult *res;
1067 : : StringInfoData cmd;
1068 : : CopyFromState cstate;
1069 : : List *attnamelist;
1070 : : ParseState *pstate;
950 1071 : 195 : List *options = NIL;
363 1072 : 195 : bool gencol_published = false;
1073 : :
1074 : : /* Get the publisher relation info. */
3141 peter_e@gmx.net 1075 : 195 : fetch_remote_table_info(get_namespace_name(RelationGetNamespace(rel)),
363 akapila@postgresql.o 1076 : 195 : RelationGetRelationName(rel), &lrel, &qual,
1077 : : &gencol_published);
1078 : :
1079 : : /* Put the relation into relmap. */
3141 peter_e@gmx.net 1080 : 194 : logicalrep_relmap_update(&lrel);
1081 : :
1082 : : /* Map the publisher relation to local one. */
1083 : 194 : relmapentry = logicalrep_rel_open(lrel.remoteid, NoLock);
1084 [ - + ]: 192 : Assert(rel == relmapentry->localrel);
1085 : :
1086 : : /* Start copy on the publisher. */
1087 : 192 : initStringInfo(&cmd);
1088 : :
1089 : : /* Regular table with no row filter or generated columns */
363 akapila@postgresql.o 1090 [ + + + + : 192 : if (lrel.relkind == RELKIND_RELATION && qual == NIL && !gencol_published)
+ + ]
1091 : : {
706 1092 : 164 : appendStringInfo(&cmd, "COPY %s",
2049 peter@eisentraut.org 1093 : 164 : quote_qualified_identifier(lrel.nspname, lrel.relname));
1094 : :
1095 : : /* If the table has columns, then specify the columns */
706 akapila@postgresql.o 1096 [ + + ]: 164 : if (lrel.natts)
1097 : : {
1098 : 163 : appendStringInfoString(&cmd, " (");
1099 : :
1100 : : /*
1101 : : * XXX Do we need to list the columns in all cases? Maybe we're
1102 : : * replicating all columns?
1103 : : */
1104 [ + + ]: 445 : for (int i = 0; i < lrel.natts; i++)
1105 : : {
1106 [ + + ]: 282 : if (i > 0)
1107 : 119 : appendStringInfoString(&cmd, ", ");
1108 : :
1109 : 282 : appendStringInfoString(&cmd, quote_identifier(lrel.attnames[i]));
1110 : : }
1111 : :
566 drowley@postgresql.o 1112 : 163 : appendStringInfoChar(&cmd, ')');
1113 : : }
1114 : :
706 akapila@postgresql.o 1115 : 164 : appendStringInfoString(&cmd, " TO STDOUT");
1116 : : }
1117 : : else
1118 : : {
1119 : : /*
1120 : : * For non-tables and tables with row filters, we need to do COPY
1121 : : * (SELECT ...), but we can't just do SELECT * because we may need to
1122 : : * copy only subset of columns including generated columns. For tables
1123 : : * with any row filters, build a SELECT query with OR'ed row filters
1124 : : * for COPY.
1125 : : *
1126 : : * We also need to use this same COPY (SELECT ...) syntax when
1127 : : * generated columns are published, because copy of generated columns
1128 : : * is not supported by the normal COPY.
1129 : : */
1839 drowley@postgresql.o 1130 : 28 : appendStringInfoString(&cmd, "COPY (SELECT ");
2049 peter@eisentraut.org 1131 [ + + ]: 70 : for (int i = 0; i < lrel.natts; i++)
1132 : : {
1133 : 42 : appendStringInfoString(&cmd, quote_identifier(lrel.attnames[i]));
1134 [ + + ]: 42 : if (i < lrel.natts - 1)
1135 : 14 : appendStringInfoString(&cmd, ", ");
1136 : : }
1137 : :
1344 akapila@postgresql.o 1138 : 28 : appendStringInfoString(&cmd, " FROM ");
1139 : :
1140 : : /*
1141 : : * For regular tables, make sure we don't copy data from a child that
1142 : : * inherits the named table as those will be copied separately.
1143 : : */
1144 [ + + ]: 28 : if (lrel.relkind == RELKIND_RELATION)
1145 : 11 : appendStringInfoString(&cmd, "ONLY ");
1146 : :
1147 : 28 : appendStringInfoString(&cmd, quote_qualified_identifier(lrel.nspname, lrel.relname));
1148 : : /* list of OR'ed filters */
1149 [ + + ]: 28 : if (qual != NIL)
1150 : : {
1151 : : ListCell *lc;
1152 : 11 : char *q = strVal(linitial(qual));
1153 : :
1154 : 11 : appendStringInfo(&cmd, " WHERE %s", q);
1155 [ + - + + : 12 : for_each_from(lc, qual, 1)
+ + ]
1156 : : {
1157 : 1 : q = strVal(lfirst(lc));
1158 : 1 : appendStringInfo(&cmd, " OR %s", q);
1159 : : }
1160 : 11 : list_free_deep(qual);
1161 : : }
1162 : :
1163 : 28 : appendStringInfoString(&cmd, ") TO STDOUT");
1164 : : }
1165 : :
1166 : : /*
1167 : : * Prior to v16, initial table synchronization will use text format even
1168 : : * if the binary option is enabled for a subscription.
1169 : : */
950 1170 [ + - ]: 192 : if (walrcv_server_version(LogRepWorkerWalRcvConn) >= 160000 &&
1171 [ + + ]: 192 : MySubscription->binary)
1172 : : {
1173 : 5 : appendStringInfoString(&cmd, " WITH (FORMAT binary)");
1174 : 5 : options = list_make1(makeDefElem("format",
1175 : : (Node *) makeString("binary"), -1));
1176 : : }
1177 : :
1630 alvherre@alvh.no-ip. 1178 : 192 : res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 0, NULL);
3141 peter_e@gmx.net 1179 : 192 : pfree(cmd.data);
1180 [ - + ]: 192 : if (res->status != WALRCV_OK_COPY_OUT)
3141 peter_e@gmx.net 1181 [ # # ]:UBC 0 : ereport(ERROR,
1182 : : (errcode(ERRCODE_CONNECTION_FAILURE),
1183 : : errmsg("could not start initial contents copy for table \"%s.%s\": %s",
1184 : : lrel.nspname, lrel.relname, res->err)));
3141 peter_e@gmx.net 1185 :CBC 192 : walrcv_clear_result(res);
1186 : :
1187 : 192 : copybuf = makeStringInfo();
1188 : :
3116 1189 : 192 : pstate = make_parsestate(NULL);
2126 tgl@sss.pgh.pa.us 1190 : 192 : (void) addRangeTableEntryForRelation(pstate, rel, AccessShareLock,
1191 : : NULL, false, false);
1192 : :
3141 peter_e@gmx.net 1193 : 192 : attnamelist = make_copy_attnamelist(relmapentry);
950 akapila@postgresql.o 1194 : 192 : cstate = BeginCopyFrom(pstate, rel, NULL, NULL, false, copy_read_data, attnamelist, options);
1195 : :
1196 : : /* Do the copy */
3141 peter_e@gmx.net 1197 : 191 : (void) CopyFrom(cstate);
1198 : :
1199 : 184 : logicalrep_rel_close(relmapentry, NoLock);
1200 : 184 : }
1201 : :
1202 : : /*
1203 : : * Determine the tablesync slot name.
1204 : : *
1205 : : * The name must not exceed NAMEDATALEN - 1 because of remote node constraints
1206 : : * on slot name length. We append system_identifier to avoid slot_name
1207 : : * collision with subscriptions in other clusters. With the current scheme
1208 : : * pg_%u_sync_%u_UINT64_FORMAT (3 + 10 + 6 + 10 + 20 + '\0'), the maximum
1209 : : * length of slot_name will be 50.
1210 : : *
1211 : : * The returned slot name is stored in the supplied buffer (syncslotname) with
1212 : : * the given size.
1213 : : *
1214 : : * Note: We don't use the subscription slot name as part of tablesync slot name
1215 : : * because we are responsible for cleaning up these slots and it could become
1216 : : * impossible to recalculate what name to cleanup if the subscription slot name
1217 : : * had changed.
1218 : : */
1219 : : void
1719 akapila@postgresql.o 1220 : 384 : ReplicationSlotNameForTablesync(Oid suboid, Oid relid,
1221 : : char *syncslotname, Size szslot)
1222 : : {
1716 1223 : 384 : snprintf(syncslotname, szslot, "pg_%u_sync_%u_" UINT64_FORMAT, suboid,
1224 : : relid, GetSystemIdentifier());
1719 1225 : 384 : }
1226 : :
1227 : : /*
1228 : : * Start syncing the table in the sync worker.
1229 : : *
1230 : : * If nothing needs to be done to sync the table, we exit the worker without
1231 : : * any further action.
1232 : : *
1233 : : * The returned slot name is palloc'ed in current memory context.
1234 : : */
1235 : : static char *
3141 peter_e@gmx.net 1236 : 195 : LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
1237 : : {
1238 : : char *slotname;
1239 : : char *err;
1240 : : char relstate;
1241 : : XLogRecPtr relstate_lsn;
1242 : : Relation rel;
1243 : : AclResult aclresult;
1244 : : WalRcvExecResult *res;
1245 : : char originname[NAMEDATALEN];
1246 : : RepOriginId originid;
1247 : : UserContext ucxt;
1248 : : bool must_use_password;
1249 : : bool run_as_owner;
1250 : :
1251 : : /* Check the state of the table synchronization. */
1252 : 195 : StartTransactionCommand();
3113 fujii@postgresql.org 1253 : 195 : relstate = GetSubscriptionRelState(MyLogicalRepWorker->subid,
1254 : 195 : MyLogicalRepWorker->relid,
1255 : : &relstate_lsn);
742 akapila@postgresql.o 1256 : 195 : CommitTransactionCommand();
1257 : :
1258 : : /* Is the use of a password mandatory? */
866 1259 [ + + ]: 386 : must_use_password = MySubscription->passwordrequired &&
742 1260 [ - + ]: 191 : !MySubscription->ownersuperuser;
1261 : :
3141 peter_e@gmx.net 1262 [ - + ]: 195 : SpinLockAcquire(&MyLogicalRepWorker->relmutex);
3113 fujii@postgresql.org 1263 : 195 : MyLogicalRepWorker->relstate = relstate;
1264 : 195 : MyLogicalRepWorker->relstate_lsn = relstate_lsn;
3141 peter_e@gmx.net 1265 : 195 : SpinLockRelease(&MyLogicalRepWorker->relmutex);
1266 : :
1267 : : /*
1268 : : * If synchronization is already done or no longer necessary, exit now
1269 : : * that we've updated shared memory state.
1270 : : */
1839 alvherre@alvh.no-ip. 1271 [ - + ]: 195 : switch (relstate)
1272 : : {
1839 alvherre@alvh.no-ip. 1273 :UBC 0 : case SUBREL_STATE_SYNCDONE:
1274 : : case SUBREL_STATE_READY:
1275 : : case SUBREL_STATE_UNKNOWN:
12 akapila@postgresql.o 1276 :UNC 0 : FinishSyncWorker(); /* doesn't return */
1277 : : }
1278 : :
1279 : : /* Calculate the name of the tablesync slot. */
1716 akapila@postgresql.o 1280 :CBC 195 : slotname = (char *) palloc(NAMEDATALEN);
1281 : 195 : ReplicationSlotNameForTablesync(MySubscription->oid,
1282 : 195 : MyLogicalRepWorker->relid,
1283 : : slotname,
1284 : : NAMEDATALEN);
1285 : :
1286 : : /*
1287 : : * Here we use the slot name instead of the subscription name as the
1288 : : * application_name, so that it is different from the leader apply worker,
1289 : : * so that synchronous replication can distinguish them.
1290 : : */
1630 alvherre@alvh.no-ip. 1291 : 195 : LogRepWorkerWalRcvConn =
631 akapila@postgresql.o 1292 : 195 : walrcv_connect(MySubscription->conninfo, true, true,
1293 : : must_use_password,
1294 : : slotname, &err);
1630 alvherre@alvh.no-ip. 1295 [ - + ]: 195 : if (LogRepWorkerWalRcvConn == NULL)
3141 peter_e@gmx.net 1296 [ # # ]:UBC 0 : ereport(ERROR,
1297 : : (errcode(ERRCODE_CONNECTION_FAILURE),
1298 : : errmsg("table synchronization worker for subscription \"%s\" could not connect to the publisher: %s",
1299 : : MySubscription->name, err)));
1300 : :
1839 alvherre@alvh.no-ip. 1301 [ + + - + :CBC 195 : Assert(MyLogicalRepWorker->relstate == SUBREL_STATE_INIT ||
- - ]
1302 : : MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC ||
1303 : : MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY);
1304 : :
1305 : : /* Assign the origin tracking record name. */
1113 akapila@postgresql.o 1306 : 195 : ReplicationOriginNameForLogicalRep(MySubscription->oid,
1307 : 195 : MyLogicalRepWorker->relid,
1308 : : originname,
1309 : : sizeof(originname));
1310 : :
1719 1311 [ + + ]: 195 : if (MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC)
1312 : : {
1313 : : /*
1314 : : * We have previously errored out before finishing the copy so the
1315 : : * replication slot might exist. We want to remove the slot if it
1316 : : * already exists and proceed.
1317 : : *
1318 : : * XXX We could also instead try to drop the slot, last time we failed
1319 : : * but for that, we might need to clean up the copy state as it might
1320 : : * be in the middle of fetching the rows. Also, if there is a network
1321 : : * breakdown then it wouldn't have succeeded so trying it next time
1322 : : * seems like a better bet.
1323 : : */
1630 alvherre@alvh.no-ip. 1324 : 7 : ReplicationSlotDropAtPubNode(LogRepWorkerWalRcvConn, slotname, true);
1325 : : }
1719 akapila@postgresql.o 1326 [ - + ]: 188 : else if (MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY)
1327 : : {
1328 : : /*
1329 : : * The COPY phase was previously done, but tablesync then crashed
1330 : : * before it was able to finish normally.
1331 : : */
1719 akapila@postgresql.o 1332 :UBC 0 : StartTransactionCommand();
1333 : :
1334 : : /*
1335 : : * The origin tracking name must already exist. It was created first
1336 : : * time this tablesync was launched.
1337 : : */
1338 : 0 : originid = replorigin_by_name(originname, false);
1023 1339 : 0 : replorigin_session_setup(originid, 0);
1719 1340 : 0 : replorigin_session_origin = originid;
1341 : 0 : *origin_startpos = replorigin_session_get_progress(false);
1342 : :
1343 : 0 : CommitTransactionCommand();
1344 : :
1345 : 0 : goto copy_table_done;
1346 : : }
1347 : :
1839 alvherre@alvh.no-ip. 1348 [ - + ]:CBC 195 : SpinLockAcquire(&MyLogicalRepWorker->relmutex);
1349 : 195 : MyLogicalRepWorker->relstate = SUBREL_STATE_DATASYNC;
1350 : 195 : MyLogicalRepWorker->relstate_lsn = InvalidXLogRecPtr;
1351 : 195 : SpinLockRelease(&MyLogicalRepWorker->relmutex);
1352 : :
1353 : : /* Update the state and make it visible to others. */
1354 : 195 : StartTransactionCommand();
1355 : 195 : UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
1356 : 195 : MyLogicalRepWorker->relid,
1357 : 195 : MyLogicalRepWorker->relstate,
88 akapila@postgresql.o 1358 : 195 : MyLogicalRepWorker->relstate_lsn,
1359 : : false);
1839 alvherre@alvh.no-ip. 1360 : 195 : CommitTransactionCommand();
1301 andres@anarazel.de 1361 : 195 : pgstat_report_stat(true);
1362 : :
1839 alvherre@alvh.no-ip. 1363 : 195 : StartTransactionCommand();
1364 : :
1365 : : /*
1366 : : * Use a standard write lock here. It might be better to disallow access
1367 : : * to the table while it's being synchronized. But we don't want to block
1368 : : * the main apply process from working and it has to open the relation in
1369 : : * RowExclusiveLock when remapping remote relation id to local one.
1370 : : */
1371 : 195 : rel = table_open(MyLogicalRepWorker->relid, RowExclusiveLock);
1372 : :
1373 : : /*
1374 : : * Start a transaction in the remote node in REPEATABLE READ mode. This
1375 : : * ensures that both the replication slot we create (see below) and the
1376 : : * COPY are consistent with each other.
1377 : : */
1630 1378 : 195 : res = walrcv_exec(LogRepWorkerWalRcvConn,
1379 : : "BEGIN READ ONLY ISOLATION LEVEL REPEATABLE READ",
1380 : : 0, NULL);
1839 1381 [ - + ]: 195 : if (res->status != WALRCV_OK_COMMAND)
1839 alvherre@alvh.no-ip. 1382 [ # # ]:UBC 0 : ereport(ERROR,
1383 : : (errcode(ERRCODE_CONNECTION_FAILURE),
1384 : : errmsg("table copy could not start transaction on publisher: %s",
1385 : : res->err)));
1839 alvherre@alvh.no-ip. 1386 :CBC 195 : walrcv_clear_result(res);
1387 : :
1388 : : /*
1389 : : * Create a new permanent logical decoding slot. This slot will be used
1390 : : * for the catchup phase after COPY is done, so tell it to use the
1391 : : * snapshot to make the final data consistent.
1392 : : */
1567 akapila@postgresql.o 1393 : 195 : walrcv_create_slot(LogRepWorkerWalRcvConn,
1394 : : slotname, false /* permanent */ , false /* two_phase */ ,
1395 : : MySubscription->failover,
1396 : : CRS_USE_SNAPSHOT, origin_startpos);
1397 : :
1398 : : /*
1399 : : * Setup replication origin tracking. The purpose of doing this before the
1400 : : * copy is to avoid doing the copy again due to any error in setting up
1401 : : * origin tracking.
1402 : : */
1719 1403 : 195 : originid = replorigin_by_name(originname, true);
1404 [ + - ]: 195 : if (!OidIsValid(originid))
1405 : : {
1406 : : /*
1407 : : * Origin tracking does not exist, so create it now.
1408 : : *
1409 : : * Then advance to the LSN got from walrcv_create_slot. This is WAL
1410 : : * logged for the purpose of recovery. Locks are to prevent the
1411 : : * replication origin from vanishing while advancing.
1412 : : */
1413 : 195 : originid = replorigin_create(originname);
1414 : :
1415 : 195 : LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1416 : 195 : replorigin_advance(originid, *origin_startpos, InvalidXLogRecPtr,
1417 : : true /* go backward */ , true /* WAL log */ );
1418 : 195 : UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1419 : :
1023 1420 : 195 : replorigin_session_setup(originid, 0);
1719 1421 : 195 : replorigin_session_origin = originid;
1422 : : }
1423 : : else
1424 : : {
1719 akapila@postgresql.o 1425 [ # # ]:UBC 0 : ereport(ERROR,
1426 : : (errcode(ERRCODE_DUPLICATE_OBJECT),
1427 : : errmsg("replication origin \"%s\" already exists",
1428 : : originname)));
1429 : : }
1430 : :
1431 : : /*
1432 : : * Make sure that the copy command runs as the table owner, unless the
1433 : : * user has opted out of that behaviour.
1434 : : */
872 msawada@postgresql.o 1435 :CBC 195 : run_as_owner = MySubscription->runasowner;
1436 [ + + ]: 195 : if (!run_as_owner)
1437 : 194 : SwitchToUntrustedUser(rel->rd_rel->relowner, &ucxt);
1438 : :
1439 : : /*
1440 : : * Check that our table sync worker has permission to insert into the
1441 : : * target table.
1442 : : */
1443 : 195 : aclresult = pg_class_aclcheck(RelationGetRelid(rel), GetUserId(),
1444 : : ACL_INSERT);
1445 [ - + ]: 195 : if (aclresult != ACLCHECK_OK)
872 msawada@postgresql.o 1446 :UBC 0 : aclcheck_error(aclresult,
1447 : 0 : get_relkind_objtype(rel->rd_rel->relkind),
1448 : 0 : RelationGetRelationName(rel));
1449 : :
1450 : : /*
1451 : : * COPY FROM does not honor RLS policies. That is not a problem for
1452 : : * subscriptions owned by roles with BYPASSRLS privilege (or superuser,
1453 : : * who has it implicitly), but other roles should not be able to
1454 : : * circumvent RLS. Disallow logical replication into RLS enabled
1455 : : * relations for such roles.
1456 : : */
872 msawada@postgresql.o 1457 [ - + ]:CBC 195 : if (check_enable_rls(RelationGetRelid(rel), InvalidOid, false) == RLS_ENABLED)
872 msawada@postgresql.o 1458 [ # # ]:UBC 0 : ereport(ERROR,
1459 : : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1460 : : errmsg("user \"%s\" cannot replicate into relation with row-level security enabled: \"%s\"",
1461 : : GetUserNameFromId(GetUserId(), true),
1462 : : RelationGetRelationName(rel))));
1463 : :
1464 : : /* Now do the initial data copy */
1300 tomas.vondra@postgre 1465 :CBC 195 : PushActiveSnapshot(GetTransactionSnapshot());
1466 : 195 : copy_table(rel);
1467 : 184 : PopActiveSnapshot();
1468 : :
1630 alvherre@alvh.no-ip. 1469 : 184 : res = walrcv_exec(LogRepWorkerWalRcvConn, "COMMIT", 0, NULL);
1839 1470 [ - + ]: 184 : if (res->status != WALRCV_OK_COMMAND)
1839 alvherre@alvh.no-ip. 1471 [ # # ]:UBC 0 : ereport(ERROR,
1472 : : (errcode(ERRCODE_CONNECTION_FAILURE),
1473 : : errmsg("table copy could not finish transaction on publisher: %s",
1474 : : res->err)));
1839 alvherre@alvh.no-ip. 1475 :CBC 184 : walrcv_clear_result(res);
1476 : :
861 tgl@sss.pgh.pa.us 1477 [ + + ]: 184 : if (!run_as_owner)
872 msawada@postgresql.o 1478 : 183 : RestoreUserContext(&ucxt);
1479 : :
1839 alvherre@alvh.no-ip. 1480 : 184 : table_close(rel, NoLock);
1481 : :
1482 : : /* Make the copy visible. */
1483 : 184 : CommandCounterIncrement();
1484 : :
1485 : : /*
1486 : : * Update the persisted state to indicate the COPY phase is done; make it
1487 : : * visible to others.
1488 : : */
1719 akapila@postgresql.o 1489 : 184 : UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
1490 : 184 : MyLogicalRepWorker->relid,
1491 : : SUBREL_STATE_FINISHEDCOPY,
88 1492 : 184 : MyLogicalRepWorker->relstate_lsn,
1493 : : false);
1494 : :
1719 1495 : 184 : CommitTransactionCommand();
1496 : :
1497 : 184 : copy_table_done:
1498 : :
1499 [ - + ]: 184 : elog(DEBUG1,
1500 : : "LogicalRepSyncTableStart: '%s' origin_startpos lsn %X/%08X",
1501 : : originname, LSN_FORMAT_ARGS(*origin_startpos));
1502 : :
1503 : : /*
1504 : : * We are done with the initial data synchronization, update the state.
1505 : : */
1839 alvherre@alvh.no-ip. 1506 [ - + ]: 184 : SpinLockAcquire(&MyLogicalRepWorker->relmutex);
1507 : 184 : MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCWAIT;
1508 : 184 : MyLogicalRepWorker->relstate_lsn = *origin_startpos;
1509 : 184 : SpinLockRelease(&MyLogicalRepWorker->relmutex);
1510 : :
1511 : : /*
1512 : : * Finally, wait until the leader apply worker tells us to catch up and
1513 : : * then return to let LogicalRepApplyLoop do it.
1514 : : */
1515 : 184 : wait_for_worker_state_change(SUBREL_STATE_CATCHUP);
3141 peter_e@gmx.net 1516 : 184 : return slotname;
1517 : : }
1518 : :
1519 : : /*
1520 : : * Execute the initial sync with error handling. Disable the subscription,
1521 : : * if it's required.
1522 : : *
1523 : : * Allocate the slot name in long-lived context on return. Note that we don't
1524 : : * handle FATAL errors which are probably because of system resource error and
1525 : : * are not repeatable.
1526 : : */
1527 : : static void
817 akapila@postgresql.o 1528 : 195 : start_table_sync(XLogRecPtr *origin_startpos, char **slotname)
1529 : : {
1530 : 195 : char *sync_slotname = NULL;
1531 : :
1532 [ - + ]: 195 : Assert(am_tablesync_worker());
1533 : :
1534 [ + + ]: 195 : PG_TRY();
1535 : : {
1536 : : /* Call initial sync. */
1537 : 195 : sync_slotname = LogicalRepSyncTableStart(origin_startpos);
1538 : : }
1539 : 11 : PG_CATCH();
1540 : : {
1541 [ + + ]: 11 : if (MySubscription->disableonerr)
1542 : 1 : DisableSubscriptionAndExit();
1543 : : else
1544 : : {
1545 : : /*
1546 : : * Report the worker failed during table synchronization. Abort
1547 : : * the current transaction so that the stats message is sent in an
1548 : : * idle state.
1549 : : */
1550 : 10 : AbortOutOfAnyTransaction();
1551 : 10 : pgstat_report_subscription_error(MySubscription->oid, false);
1552 : :
1553 : 10 : PG_RE_THROW();
1554 : : }
1555 : : }
1556 [ - + ]: 184 : PG_END_TRY();
1557 : :
1558 : : /* allocate slot name in long-lived context */
1559 : 184 : *slotname = MemoryContextStrdup(ApplyContext, sync_slotname);
1560 : 184 : pfree(sync_slotname);
1561 : 184 : }
1562 : :
1563 : : /*
1564 : : * Runs the tablesync worker.
1565 : : *
1566 : : * It starts syncing tables. After a successful sync, sets streaming options
1567 : : * and starts streaming to catchup with apply worker.
1568 : : */
1569 : : static void
1570 : 195 : run_tablesync_worker()
1571 : : {
1572 : : char originname[NAMEDATALEN];
1573 : 195 : XLogRecPtr origin_startpos = InvalidXLogRecPtr;
1574 : 195 : char *slotname = NULL;
1575 : : WalRcvStreamOptions options;
1576 : :
1577 : 195 : start_table_sync(&origin_startpos, &slotname);
1578 : :
1579 : 184 : ReplicationOriginNameForLogicalRep(MySubscription->oid,
1580 : 184 : MyLogicalRepWorker->relid,
1581 : : originname,
1582 : : sizeof(originname));
1583 : :
1584 : 184 : set_apply_error_context_origin(originname);
1585 : :
1586 : 184 : set_stream_options(&options, slotname, &origin_startpos);
1587 : :
1588 : 184 : walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
1589 : :
1590 : : /* Apply the changes till we catchup with the apply worker. */
1591 : 184 : start_apply(origin_startpos);
817 akapila@postgresql.o 1592 :UBC 0 : }
1593 : :
1594 : : /* Logical Replication Tablesync worker entry point */
1595 : : void
817 akapila@postgresql.o 1596 :CBC 198 : TablesyncWorkerMain(Datum main_arg)
1597 : : {
1598 : 198 : int worker_slot = DatumGetInt32(main_arg);
1599 : :
1600 : 198 : SetupApplyOrSyncWorker(worker_slot);
1601 : :
1602 : 195 : run_tablesync_worker();
1603 : :
12 akapila@postgresql.o 1604 :UNC 0 : FinishSyncWorker();
1605 : : }
1606 : :
1607 : : /*
1608 : : * If the subscription has no tables then return false.
1609 : : *
1610 : : * Otherwise, are all tablesyncs READY?
1611 : : *
1612 : : * Note: This function is not suitable to be called from outside of apply or
1613 : : * tablesync workers because MySubscription needs to be already initialized.
1614 : : */
1615 : : bool
1567 akapila@postgresql.o 1616 :CBC 165 : AllTablesyncsReady(void)
1617 : : {
1618 : 165 : bool started_tx = false;
1619 : 165 : bool has_subrels = false;
1620 : :
1621 : : /* We need up-to-date sync state info for subscription tables here. */
12 akapila@postgresql.o 1622 :GNC 165 : has_subrels = FetchRelationStates(&started_tx);
1623 : :
1567 akapila@postgresql.o 1624 [ + + ]:CBC 165 : if (started_tx)
1625 : : {
1626 : 15 : CommitTransactionCommand();
1301 andres@anarazel.de 1627 : 15 : pgstat_report_stat(true);
1628 : : }
1629 : :
1630 : : /*
1631 : : * Return false when there are no tables in subscription or not all tables
1632 : : * are in ready state; true otherwise.
1633 : : */
1168 tgl@sss.pgh.pa.us 1634 [ + - + + ]: 165 : return has_subrels && (table_states_not_ready == NIL);
1635 : : }
1636 : :
1637 : : /*
1638 : : * Return whether the subscription currently has any tables.
1639 : : *
1640 : : * Note: Unlike HasSubscriptionTables(), this function relies on cached
1641 : : * information for subscription tables. Additionally, it should not be
1642 : : * invoked outside of apply or tablesync workers, as MySubscription must be
1643 : : * initialized first.
1644 : : */
1645 : : bool
12 akapila@postgresql.o 1646 :GNC 100 : HasSubscriptionTablesCached(void)
1647 : : {
1648 : : bool started_tx;
1649 : : bool has_subrels;
1650 : :
1651 : : /* We need up-to-date subscription tables info here */
1652 : 100 : has_subrels = FetchRelationStates(&started_tx);
1653 : :
50 1654 [ + + ]: 100 : if (started_tx)
1655 : : {
1656 : 2 : CommitTransactionCommand();
1657 : 2 : pgstat_report_stat(true);
1658 : : }
1659 : :
1660 : 100 : return has_subrels;
1661 : : }
1662 : :
1663 : : /*
1664 : : * Update the two_phase state of the specified subscription in pg_subscription.
1665 : : */
1666 : : void
1567 akapila@postgresql.o 1667 :CBC 10 : UpdateTwoPhaseState(Oid suboid, char new_state)
1668 : : {
1669 : : Relation rel;
1670 : : HeapTuple tup;
1671 : : bool nulls[Natts_pg_subscription];
1672 : : bool replaces[Natts_pg_subscription];
1673 : : Datum values[Natts_pg_subscription];
1674 : :
1675 [ + - + - : 10 : Assert(new_state == LOGICALREP_TWOPHASE_STATE_DISABLED ||
- + ]
1676 : : new_state == LOGICALREP_TWOPHASE_STATE_PENDING ||
1677 : : new_state == LOGICALREP_TWOPHASE_STATE_ENABLED);
1678 : :
1679 : 10 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
1680 : 10 : tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(suboid));
1681 [ - + ]: 10 : if (!HeapTupleIsValid(tup))
1567 akapila@postgresql.o 1682 [ # # ]:UBC 0 : elog(ERROR,
1683 : : "cache lookup failed for subscription oid %u",
1684 : : suboid);
1685 : :
1686 : : /* Form a new tuple. */
1567 akapila@postgresql.o 1687 :CBC 10 : memset(values, 0, sizeof(values));
1688 : 10 : memset(nulls, false, sizeof(nulls));
1689 : 10 : memset(replaces, false, sizeof(replaces));
1690 : :
1691 : : /* And update/set two_phase state */
1692 : 10 : values[Anum_pg_subscription_subtwophasestate - 1] = CharGetDatum(new_state);
1693 : 10 : replaces[Anum_pg_subscription_subtwophasestate - 1] = true;
1694 : :
1695 : 10 : tup = heap_modify_tuple(tup, RelationGetDescr(rel),
1696 : : values, nulls, replaces);
1697 : 10 : CatalogTupleUpdate(rel, &tup->t_self, tup);
1698 : :
1699 : 10 : heap_freetuple(tup);
1700 : 10 : table_close(rel, RowExclusiveLock);
1701 : 10 : }
|