Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : * tablesync.c
3 : : * PostgreSQL logical replication: initial table data synchronization
4 : : *
5 : : * Copyright (c) 2012-2026, PostgreSQL Global Development Group
6 : : *
7 : : * IDENTIFICATION
8 : : * src/backend/replication/logical/tablesync.c
9 : : *
10 : : * NOTES
11 : : * This file contains code for initial table data synchronization for
12 : : * logical replication.
13 : : *
14 : : * The initial data synchronization is done separately for each table,
15 : : * in a separate apply worker that only fetches the initial snapshot data
16 : : * from the publisher and then synchronizes the position in the stream with
17 : : * the leader apply worker.
18 : : *
19 : : * There are several reasons for doing the synchronization this way:
20 : : * - It allows us to parallelize the initial data synchronization
21 : : * which lowers the time needed for it to happen.
22 : : * - The initial synchronization does not have to hold the xid and LSN
23 : : * for the time it takes to copy data of all tables, causing less
24 : : * bloat and lower disk consumption compared to doing the
25 : : * synchronization in a single process for the whole database.
26 : : * - It allows us to synchronize any tables added after the initial
27 : : * synchronization has finished.
28 : : *
29 : : * The stream position synchronization works in multiple steps:
30 : : * - Apply worker requests a tablesync worker to start, setting the new
31 : : * table state to INIT.
32 : : * - Tablesync worker starts; changes table state from INIT to DATASYNC while
33 : : * copying.
34 : : * - Tablesync worker does initial table copy; there is a FINISHEDCOPY (sync
35 : : * worker specific) state to indicate when the copy phase has completed, so
36 : : * if the worker crashes with this (non-memory) state then the copy will not
37 : : * be re-attempted.
38 : : * - Tablesync worker then sets table state to SYNCWAIT; waits for state change.
39 : : * - Apply worker periodically checks for tables in SYNCWAIT state. When
40 : : * any appear, it sets the table state to CATCHUP and starts loop-waiting
41 : : * until either the table state is set to SYNCDONE or the sync worker
42 : : * exits.
43 : : * - After the sync worker has seen the state change to CATCHUP, it will
44 : : * read the stream and apply changes (acting like an apply worker) until
45 : : * it catches up to the specified stream position. Then it sets the
46 : : * state to SYNCDONE. There might be zero changes applied between
47 : : * CATCHUP and SYNCDONE, because the sync worker might be ahead of the
48 : : * apply worker.
49 : : * - Once the state is set to SYNCDONE, the apply will continue tracking
50 : : * the table until it reaches the SYNCDONE stream position, at which
51 : : * point it sets state to READY and stops tracking. Again, there might
52 : : * be zero changes in between.
53 : : *
54 : : * So the state progression is always: INIT -> DATASYNC -> FINISHEDCOPY
55 : : * -> SYNCWAIT -> CATCHUP -> SYNCDONE -> READY.
56 : : *
57 : : * The catalog pg_subscription_rel is used to keep information about
58 : : * subscribed tables and their state. The catalog holds all states
59 : : * except SYNCWAIT and CATCHUP which are only in shared memory.
60 : : *
61 : : * Example flows look like this:
62 : : * - Apply is in front:
63 : : * sync:8
64 : : * -> set in catalog FINISHEDCOPY
65 : : * -> set in memory SYNCWAIT
66 : : * apply:10
67 : : * -> set in memory CATCHUP
68 : : * -> enter wait-loop
69 : : * sync:10
70 : : * -> set in catalog SYNCDONE
71 : : * -> exit
72 : : * apply:10
73 : : * -> exit wait-loop
74 : : * -> continue rep
75 : : * apply:11
76 : : * -> set in catalog READY
77 : : *
78 : : * - Sync is in front:
79 : : * sync:10
80 : : * -> set in catalog FINISHEDCOPY
81 : : * -> set in memory SYNCWAIT
82 : : * apply:8
83 : : * -> set in memory CATCHUP
84 : : * -> continue per-table filtering
85 : : * sync:10
86 : : * -> set in catalog SYNCDONE
87 : : * -> exit
88 : : * apply:10
89 : : * -> set in catalog READY
90 : : * -> stop per-table filtering
91 : : * -> continue rep
92 : : *-------------------------------------------------------------------------
93 : : */
94 : :
95 : : #include "postgres.h"
96 : :
97 : : #include "access/table.h"
98 : : #include "access/xact.h"
99 : : #include "catalog/indexing.h"
100 : : #include "catalog/pg_subscription_rel.h"
101 : : #include "catalog/pg_type.h"
102 : : #include "commands/copy.h"
103 : : #include "miscadmin.h"
104 : : #include "nodes/makefuncs.h"
105 : : #include "parser/parse_relation.h"
106 : : #include "pgstat.h"
107 : : #include "replication/logicallauncher.h"
108 : : #include "replication/logicalrelation.h"
109 : : #include "replication/logicalworker.h"
110 : : #include "replication/origin.h"
111 : : #include "replication/slot.h"
112 : : #include "replication/walreceiver.h"
113 : : #include "replication/worker_internal.h"
114 : : #include "storage/ipc.h"
115 : : #include "storage/latch.h"
116 : : #include "storage/lmgr.h"
117 : : #include "utils/acl.h"
118 : : #include "utils/array.h"
119 : : #include "utils/builtins.h"
120 : : #include "utils/lsyscache.h"
121 : : #include "utils/rls.h"
122 : : #include "utils/snapmgr.h"
123 : : #include "utils/syscache.h"
124 : : #include "utils/usercontext.h"
125 : : #include "utils/wait_event.h"
126 : :
127 : : List *table_states_not_ready = NIL;
128 : :
129 : : static StringInfo copybuf = NULL;
130 : :
131 : : /*
132 : : * Wait until the relation sync state is set in the catalog to the expected
133 : : * one; return true when it happens.
134 : : *
135 : : * Returns false if the table sync worker or the table itself have
136 : : * disappeared, or the table state has been reset.
137 : : *
138 : : * Currently, this is used in the apply worker when transitioning from
139 : : * CATCHUP state to SYNCDONE.
140 : : */
141 : : static bool
150 akapila@postgresql.o 142 :GNC 191 : wait_for_table_state_change(Oid relid, char expected_state)
143 : : {
144 : : char state;
145 : :
146 : : for (;;)
3279 peter_e@gmx.net 147 :CBC 212 : {
148 : : LogicalRepWorker *worker;
149 : : XLogRecPtr statelsn;
150 : :
3208 151 [ - + ]: 403 : CHECK_FOR_INTERRUPTS();
152 : :
1977 alvherre@alvh.no-ip. 153 : 403 : InvalidateCatalogSnapshot();
3204 peter_e@gmx.net 154 : 403 : state = GetSubscriptionRelState(MyLogicalRepWorker->subid,
155 : : relid, &statelsn);
156 : :
157 [ - + ]: 403 : if (state == SUBREL_STATE_UNKNOWN)
1977 alvherre@alvh.no-ip. 158 :UBC 0 : break;
159 : :
3204 peter_e@gmx.net 160 [ + + ]:CBC 403 : if (state == expected_state)
161 : 191 : return true;
162 : :
163 : : /* Check if the sync worker is still running and bail if not. */
3279 164 : 212 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
138 akapila@postgresql.o 165 :GNC 212 : worker = logicalrep_worker_find(WORKERTYPE_TABLESYNC,
166 : 212 : MyLogicalRepWorker->subid, relid,
167 : : false);
3204 peter_e@gmx.net 168 :CBC 212 : LWLockRelease(LogicalRepWorkerLock);
3279 169 [ - + ]: 212 : if (!worker)
1977 alvherre@alvh.no-ip. 170 :UBC 0 : break;
171 : :
2669 tmunro@postgresql.or 172 :CBC 212 : (void) WaitLatch(MyLatch,
173 : : WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
174 : : 1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
175 : :
3204 andres@anarazel.de 176 : 212 : ResetLatch(MyLatch);
177 : : }
178 : :
3204 peter_e@gmx.net 179 :UBC 0 : return false;
180 : : }
181 : :
182 : : /*
183 : : * Wait until the apply worker changes the state of our synchronization
184 : : * worker to the expected one.
185 : : *
186 : : * Used when transitioning from SYNCWAIT state to CATCHUP.
187 : : *
188 : : * Returns false if the apply worker has disappeared.
189 : : */
190 : : static bool
3204 peter_e@gmx.net 191 :CBC 193 : wait_for_worker_state_change(char expected_state)
192 : : {
193 : : int rc;
194 : :
195 : : for (;;)
196 : 193 : {
197 : : LogicalRepWorker *worker;
198 : :
199 [ - + ]: 386 : CHECK_FOR_INTERRUPTS();
200 : :
201 : : /*
202 : : * Done if already in correct state. (We assume this fetch is atomic
203 : : * enough to not give a misleading answer if we do it with no lock.)
204 : : */
3180 tgl@sss.pgh.pa.us 205 [ + + ]: 386 : if (MyLogicalRepWorker->relstate == expected_state)
206 : 193 : return true;
207 : :
208 : : /*
209 : : * Bail out if the apply worker has died, else signal it we're
210 : : * waiting.
211 : : */
3204 peter_e@gmx.net 212 : 193 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
138 akapila@postgresql.o 213 :GNC 193 : worker = logicalrep_worker_find(WORKERTYPE_APPLY,
214 : 193 : MyLogicalRepWorker->subid, InvalidOid,
215 : : false);
3180 tgl@sss.pgh.pa.us 216 [ + - + - ]:CBC 193 : if (worker && worker->proc)
217 : 193 : logicalrep_worker_wakeup_ptr(worker);
3204 peter_e@gmx.net 218 : 193 : LWLockRelease(LogicalRepWorkerLock);
219 [ - + ]: 193 : if (!worker)
3180 tgl@sss.pgh.pa.us 220 :UBC 0 : break;
221 : :
222 : : /*
223 : : * Wait. We expect to get a latch signal back from the apply worker,
224 : : * but use a timeout in case it dies without sending one.
225 : : */
3204 andres@anarazel.de 226 :CBC 193 : rc = WaitLatch(MyLatch,
227 : : WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
228 : : 1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
229 : :
3180 tgl@sss.pgh.pa.us 230 [ + - ]: 193 : if (rc & WL_LATCH_SET)
231 : 193 : ResetLatch(MyLatch);
232 : : }
233 : :
3279 peter_e@gmx.net 234 :UBC 0 : return false;
235 : : }
236 : :
237 : : /*
238 : : * Handle table synchronization cooperation from the synchronization
239 : : * worker.
240 : : *
241 : : * If the sync worker is in CATCHUP state and reached (or passed) the
242 : : * predetermined synchronization point in the WAL stream, mark the table as
243 : : * SYNCDONE and finish.
244 : : */
245 : : void
150 akapila@postgresql.o 246 :GNC 205 : ProcessSyncingTablesForSync(XLogRecPtr current_lsn)
247 : : {
3279 peter_e@gmx.net 248 [ - + ]:CBC 205 : SpinLockAcquire(&MyLogicalRepWorker->relmutex);
249 : :
250 [ + - ]: 205 : if (MyLogicalRepWorker->relstate == SUBREL_STATE_CATCHUP &&
251 [ + + ]: 205 : current_lsn >= MyLogicalRepWorker->relstate_lsn)
252 : : {
253 : : TimeLineID tli;
1857 akapila@postgresql.o 254 : 193 : char syncslotname[NAMEDATALEN] = {0};
1293 255 : 193 : char originname[NAMEDATALEN] = {0};
256 : :
3204 peter_e@gmx.net 257 : 193 : MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCDONE;
3279 258 : 193 : MyLogicalRepWorker->relstate_lsn = current_lsn;
259 : :
260 : 193 : SpinLockRelease(&MyLogicalRepWorker->relmutex);
261 : :
262 : : /*
263 : : * UpdateSubscriptionRelState must be called within a transaction.
264 : : */
1857 akapila@postgresql.o 265 [ + - ]: 193 : if (!IsTransactionState())
266 : 193 : StartTransactionCommand();
267 : :
2900 peter_e@gmx.net 268 : 193 : UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
269 : 193 : MyLogicalRepWorker->relid,
270 : 193 : MyLogicalRepWorker->relstate,
226 akapila@postgresql.o 271 : 193 : MyLogicalRepWorker->relstate_lsn,
272 : : false);
273 : :
274 : : /*
275 : : * End streaming so that LogRepWorkerWalRcvConn can be used to drop
276 : : * the slot.
277 : : */
1768 alvherre@alvh.no-ip. 278 : 193 : walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli);
279 : :
280 : : /*
281 : : * Cleanup the tablesync slot.
282 : : *
283 : : * This has to be done after updating the state because otherwise if
284 : : * there is an error while doing the database operations we won't be
285 : : * able to rollback dropped slot.
286 : : */
1857 akapila@postgresql.o 287 : 193 : ReplicationSlotNameForTablesync(MyLogicalRepWorker->subid,
288 : 193 : MyLogicalRepWorker->relid,
289 : : syncslotname,
290 : : sizeof(syncslotname));
291 : :
292 : : /*
293 : : * It is important to give an error if we are unable to drop the slot,
294 : : * otherwise, it won't be dropped till the corresponding subscription
295 : : * is dropped. So passing missing_ok = false.
296 : : */
1768 alvherre@alvh.no-ip. 297 : 193 : ReplicationSlotDropAtPubNode(LogRepWorkerWalRcvConn, syncslotname, false);
298 : :
1280 akapila@postgresql.o 299 : 193 : CommitTransactionCommand();
300 : 193 : pgstat_report_stat(false);
301 : :
302 : : /*
303 : : * Start a new transaction to clean up the tablesync origin tracking.
304 : : * This transaction will be ended within the FinishSyncWorker(). Now,
305 : : * even, if we fail to remove this here, the apply worker will ensure
306 : : * to clean it up afterward.
307 : : *
308 : : * We need to do this after the table state is set to SYNCDONE.
309 : : * Otherwise, if an error occurs while performing the database
310 : : * operation, the worker will be restarted and the in-memory state of
311 : : * replication progress (remote_lsn) won't be rolled-back which would
312 : : * have been cleared before restart. So, the restarted worker will use
313 : : * invalid replication progress state resulting in replay of
314 : : * transactions that have already been applied.
315 : : */
316 : 193 : StartTransactionCommand();
317 : :
1251 318 : 193 : ReplicationOriginNameForLogicalRep(MyLogicalRepWorker->subid,
319 : 193 : MyLogicalRepWorker->relid,
320 : : originname,
321 : : sizeof(originname));
322 : :
323 : : /*
324 : : * Resetting the origin session removes the ownership of the slot.
325 : : * This is needed to allow the origin to be dropped.
326 : : */
1280 327 : 193 : replorigin_session_reset();
46 msawada@postgresql.o 328 :GNC 193 : replorigin_xact_clear(true);
329 : :
330 : : /*
331 : : * Drop the tablesync's origin tracking if exists.
332 : : *
333 : : * There is a chance that the user is concurrently performing refresh
334 : : * for the subscription where we remove the table state and its origin
335 : : * or the apply worker would have removed this origin. So passing
336 : : * missing_ok = true.
337 : : */
1280 akapila@postgresql.o 338 :CBC 193 : replorigin_drop_by_name(originname, true, false);
339 : :
150 akapila@postgresql.o 340 :GNC 193 : FinishSyncWorker();
341 : : }
342 : : else
3279 peter_e@gmx.net 343 :CBC 12 : SpinLockRelease(&MyLogicalRepWorker->relmutex);
344 : 12 : }
345 : :
346 : : /*
347 : : * Handle table synchronization cooperation from the apply worker.
348 : : *
349 : : * Walk over all subscription tables that are individually tracked by the
350 : : * apply process (currently, all that have state other than
351 : : * SUBREL_STATE_READY) and manage synchronization for them.
352 : : *
353 : : * If there are tables that need synchronizing and are not being synchronized
354 : : * yet, start sync workers for them (if there are free slots for sync
355 : : * workers). To prevent starting the sync worker for the same relation at a
356 : : * high frequency after a failure, we store its last start time with each sync
357 : : * state info. We start the sync worker for the same relation after waiting
358 : : * at least wal_retrieve_retry_interval.
359 : : *
360 : : * For tables that are being synchronized already, check if sync workers
361 : : * either need action from the apply worker or have finished. This is the
362 : : * SYNCWAIT to CATCHUP transition.
363 : : *
364 : : * If the synchronization position is reached (SYNCDONE), then the table can
365 : : * be marked as READY and is no longer tracked.
366 : : */
367 : : void
150 akapila@postgresql.o 368 :GNC 5420 : ProcessSyncingTablesForApply(XLogRecPtr current_lsn)
369 : : {
370 : : struct tablesync_start_time_mapping
371 : : {
372 : : Oid relid;
373 : : TimestampTz last_start_time;
374 : : };
375 : : static HTAB *last_start_times = NULL;
376 : : ListCell *lc;
377 : : bool started_tx;
1164 tgl@sss.pgh.pa.us 378 :CBC 5420 : bool should_exit = false;
226 akapila@postgresql.o 379 : 5420 : Relation rel = NULL;
380 : :
3279 peter_e@gmx.net 381 [ - + ]: 5420 : Assert(!IsTransactionState());
382 : :
383 : : /* We need up-to-date sync state info for subscription tables here. */
130 akapila@postgresql.o 384 :GNC 5420 : FetchRelationStates(NULL, NULL, &started_tx);
385 : :
386 : : /*
387 : : * Prepare a hash table for tracking last start times of workers, to avoid
388 : : * immediate restarts. We don't need it if there are no tables that need
389 : : * syncing.
390 : : */
1306 tgl@sss.pgh.pa.us 391 [ + + + + ]:CBC 5420 : if (table_states_not_ready != NIL && !last_start_times)
3244 peter_e@gmx.net 392 : 128 : {
393 : : HASHCTL ctl;
394 : :
395 : 128 : ctl.keysize = sizeof(Oid);
396 : 128 : ctl.entrysize = sizeof(struct tablesync_start_time_mapping);
397 : 128 : last_start_times = hash_create("Logical replication table sync worker start times",
398 : : 256, &ctl, HASH_ELEM | HASH_BLOBS);
399 : : }
400 : :
401 : : /*
402 : : * Clean up the hash table when we're done with all tables (just to
403 : : * release the bit of memory).
404 : : */
1306 tgl@sss.pgh.pa.us 405 [ + + + + ]: 5292 : else if (table_states_not_ready == NIL && last_start_times)
406 : : {
3244 peter_e@gmx.net 407 : 95 : hash_destroy(last_start_times);
408 : 95 : last_start_times = NULL;
409 : : }
410 : :
411 : : /*
412 : : * Process all tables that are being synchronized.
413 : : */
1705 akapila@postgresql.o 414 [ + + + + : 7324 : foreach(lc, table_states_not_ready)
+ + ]
415 : : {
3224 bruce@momjian.us 416 : 1905 : SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc);
417 : :
130 akapila@postgresql.o 418 [ + + ]:GNC 1905 : if (!started_tx)
419 : : {
420 : 320 : StartTransactionCommand();
421 : 320 : started_tx = true;
422 : : }
423 : :
424 [ - + ]: 1905 : Assert(get_rel_relkind(rstate->relid) != RELKIND_SEQUENCE);
425 : :
3279 peter_e@gmx.net 426 [ + + ]:CBC 1905 : if (rstate->state == SUBREL_STATE_SYNCDONE)
427 : : {
428 : : /*
429 : : * Apply has caught up to the position where the table sync has
430 : : * finished. Mark the table as ready so that the apply will just
431 : : * continue to replicate it normally.
432 : : */
433 [ + + ]: 189 : if (current_lsn >= rstate->lsn)
434 : : {
435 : : char originname[NAMEDATALEN];
436 : :
437 : 188 : rstate->state = SUBREL_STATE_READY;
438 : 188 : rstate->lsn = current_lsn;
439 : :
440 : : /*
441 : : * Remove the tablesync origin tracking if exists.
442 : : *
443 : : * There is a chance that the user is concurrently performing
444 : : * refresh for the subscription where we remove the table
445 : : * state and its origin or the tablesync worker would have
446 : : * already removed this origin. We can't rely on tablesync
447 : : * worker to remove the origin tracking as if there is any
448 : : * error while dropping we won't restart it to drop the
449 : : * origin. So passing missing_ok = true.
450 : : *
451 : : * Lock the subscription and origin in the same order as we
452 : : * are doing during DDL commands to avoid deadlocks. See
453 : : * AlterSubscription_refresh.
454 : : */
226 akapila@postgresql.o 455 : 188 : LockSharedObject(SubscriptionRelationId, MyLogicalRepWorker->subid,
456 : : 0, AccessShareLock);
457 : :
458 [ + + ]: 188 : if (!rel)
459 : 187 : rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
460 : :
1251 461 : 188 : ReplicationOriginNameForLogicalRep(MyLogicalRepWorker->subid,
462 : : rstate->relid,
463 : : originname,
464 : : sizeof(originname));
1280 465 : 188 : replorigin_drop_by_name(originname, true, false);
466 : :
467 : : /*
468 : : * Update the state to READY only after the origin cleanup.
469 : : */
2900 peter_e@gmx.net 470 : 188 : UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
471 : 188 : rstate->relid, rstate->state,
472 : : rstate->lsn, true);
473 : : }
474 : : }
475 : : else
476 : : {
477 : : LogicalRepWorker *syncworker;
478 : :
479 : : /*
480 : : * Look for a sync worker for this relation.
481 : : */
3279 482 : 1716 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
483 : :
138 akapila@postgresql.o 484 :GNC 1716 : syncworker = logicalrep_worker_find(WORKERTYPE_TABLESYNC,
485 : 1716 : MyLogicalRepWorker->subid,
486 : : rstate->relid, false);
487 : :
3279 peter_e@gmx.net 488 [ + + ]:CBC 1716 : if (syncworker)
489 : : {
490 : : /* Found one, update our copy of its state */
491 [ - + ]: 785 : SpinLockAcquire(&syncworker->relmutex);
492 : 785 : rstate->state = syncworker->relstate;
493 : 785 : rstate->lsn = syncworker->relstate_lsn;
3180 tgl@sss.pgh.pa.us 494 [ + + ]: 785 : if (rstate->state == SUBREL_STATE_SYNCWAIT)
495 : : {
496 : : /*
497 : : * Sync worker is waiting for apply. Tell sync worker it
498 : : * can catchup now.
499 : : */
500 : 191 : syncworker->relstate = SUBREL_STATE_CATCHUP;
501 : 191 : syncworker->relstate_lsn =
502 : 191 : Max(syncworker->relstate_lsn, current_lsn);
503 : : }
3279 peter_e@gmx.net 504 : 785 : SpinLockRelease(&syncworker->relmutex);
505 : :
506 : : /* If we told worker to catch up, wait for it. */
3180 tgl@sss.pgh.pa.us 507 [ + + ]: 785 : if (rstate->state == SUBREL_STATE_SYNCWAIT)
508 : : {
509 : : /* Signal the sync worker, as it may be waiting for us. */
510 [ + - ]: 191 : if (syncworker->proc)
511 : 191 : logicalrep_worker_wakeup_ptr(syncworker);
512 : :
513 : : /* Now safe to release the LWLock */
514 : 191 : LWLockRelease(LogicalRepWorkerLock);
515 : :
825 akapila@postgresql.o 516 [ + - ]: 191 : if (started_tx)
517 : : {
518 : : /*
519 : : * We must commit the existing transaction to release
520 : : * the existing locks before entering a busy loop.
521 : : * This is required to avoid any undetected deadlocks
522 : : * due to any existing lock as deadlock detector won't
523 : : * be able to detect the waits on the latch.
524 : : *
525 : : * Also close any tables prior to the commit.
526 : : */
226 527 [ + + ]: 191 : if (rel)
528 : : {
529 : 28 : table_close(rel, NoLock);
530 : 28 : rel = NULL;
531 : : }
825 532 : 191 : CommitTransactionCommand();
533 : 191 : pgstat_report_stat(false);
534 : : }
535 : :
536 : : /*
537 : : * Enter busy loop and wait for synchronization worker to
538 : : * reach expected state (or die trying).
539 : : */
540 : 191 : StartTransactionCommand();
541 : 191 : started_tx = true;
542 : :
150 akapila@postgresql.o 543 :GNC 191 : wait_for_table_state_change(rstate->relid,
544 : : SUBREL_STATE_SYNCDONE);
545 : : }
546 : : else
3180 tgl@sss.pgh.pa.us 547 :CBC 594 : LWLockRelease(LogicalRepWorkerLock);
548 : : }
549 : : else
550 : : {
551 : : /*
552 : : * If there is no sync worker for this table yet, count
553 : : * running sync workers for this subscription, while we have
554 : : * the lock.
555 : : */
556 : : int nsyncworkers =
1031 557 : 931 : logicalrep_sync_worker_count(MyLogicalRepWorker->subid);
558 : : struct tablesync_start_time_mapping *hentry;
559 : : bool found;
560 : :
561 : : /* Now safe to release the LWLock */
3180 562 : 931 : LWLockRelease(LogicalRepWorkerLock);
563 : :
130 akapila@postgresql.o 564 :GNC 931 : hentry = hash_search(last_start_times, &rstate->relid,
565 : : HASH_ENTER, &found);
566 [ + + ]: 931 : if (!found)
567 : 200 : hentry->last_start_time = 0;
568 : :
569 : 931 : launch_sync_worker(WORKERTYPE_TABLESYNC, nsyncworkers,
570 : : rstate->relid, &hentry->last_start_time);
571 : : }
572 : : }
573 : : }
574 : :
575 : : /* Close table if opened */
226 akapila@postgresql.o 576 [ + + ]:CBC 5419 : if (rel)
577 : 159 : table_close(rel, NoLock);
578 : :
579 : :
3233 peter_e@gmx.net 580 [ + + ]: 5419 : if (started_tx)
581 : : {
582 : : /*
583 : : * Even when the two_phase mode is requested by the user, it remains
584 : : * as 'pending' until all tablesyncs have reached READY state.
585 : : *
586 : : * When this happens, we restart the apply worker and (if the
587 : : * conditions are still ok) then the two_phase tri-state will become
588 : : * 'enabled' at that time.
589 : : *
590 : : * Note: If the subscription has no tables then leave the state as
591 : : * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to
592 : : * work.
593 : : */
1164 tgl@sss.pgh.pa.us 594 [ + + ]: 1119 : if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING)
595 : : {
596 : 42 : CommandCounterIncrement(); /* make updates visible */
597 [ + + ]: 42 : if (AllTablesyncsReady())
598 : : {
599 [ + - ]: 6 : ereport(LOG,
600 : : (errmsg("logical replication apply worker for subscription \"%s\" will restart so that two_phase can be enabled",
601 : : MySubscription->name)));
602 : 6 : should_exit = true;
603 : : }
604 : : }
605 : :
3233 peter_e@gmx.net 606 : 1119 : CommitTransactionCommand();
1439 andres@anarazel.de 607 : 1119 : pgstat_report_stat(true);
608 : : }
609 : :
1164 tgl@sss.pgh.pa.us 610 [ + + ]: 5419 : if (should_exit)
611 : : {
612 : : /*
613 : : * Reset the last-start time for this worker so that the launcher will
614 : : * restart it without waiting for wal_retrieve_retry_interval.
615 : : */
1148 616 : 6 : ApplyLauncherForgetWorkerStartTime(MySubscription->oid);
617 : :
1164 618 : 6 : proc_exit(0);
619 : : }
3279 peter_e@gmx.net 620 : 5413 : }
621 : :
622 : : /*
623 : : * Create list of columns for COPY based on logical relation mapping.
624 : : */
625 : : static List *
626 : 204 : make_copy_attnamelist(LogicalRepRelMapEntry *rel)
627 : : {
628 : 204 : List *attnamelist = NIL;
629 : : int i;
630 : :
3223 631 [ + + ]: 542 : for (i = 0; i < rel->remoterel.natts; i++)
632 : : {
3279 633 : 338 : attnamelist = lappend(attnamelist,
3223 634 : 338 : makeString(rel->remoterel.attnames[i]));
635 : : }
636 : :
637 : :
3279 638 : 204 : return attnamelist;
639 : : }
640 : :
641 : : /*
642 : : * Data source callback for the COPY FROM, which reads from the remote
643 : : * connection and passes the data back to our local COPY.
644 : : */
645 : : static int
646 : 15022 : copy_read_data(void *outbuf, int minread, int maxread)
647 : : {
3224 bruce@momjian.us 648 : 15022 : int bytesread = 0;
649 : : int avail;
650 : :
651 : : /* If there are some leftover data from previous read, use it. */
3279 peter_e@gmx.net 652 : 15022 : avail = copybuf->len - copybuf->cursor;
653 [ - + ]: 15022 : if (avail)
654 : : {
3279 peter_e@gmx.net 655 [ # # ]:UBC 0 : if (avail > maxread)
656 : 0 : avail = maxread;
657 : 0 : memcpy(outbuf, ©buf->data[copybuf->cursor], avail);
658 : 0 : copybuf->cursor += avail;
659 : 0 : maxread -= avail;
660 : 0 : bytesread += avail;
661 : : }
662 : :
3208 peter_e@gmx.net 663 [ + - + - ]:CBC 15022 : while (maxread > 0 && bytesread < minread)
664 : : {
3279 665 : 15022 : pgsocket fd = PGINVALID_SOCKET;
666 : : int len;
667 : 15022 : char *buf = NULL;
668 : :
669 : : for (;;)
670 : : {
671 : : /* Try read the data. */
1768 alvherre@alvh.no-ip. 672 : 15022 : len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
673 : :
3279 peter_e@gmx.net 674 [ - + ]: 15022 : CHECK_FOR_INTERRUPTS();
675 : :
676 [ - + ]: 15022 : if (len == 0)
3279 peter_e@gmx.net 677 :UBC 0 : break;
3279 peter_e@gmx.net 678 [ + + ]:CBC 15022 : else if (len < 0)
679 : 15022 : return bytesread;
680 : : else
681 : : {
682 : : /* Process the data */
683 : 14820 : copybuf->data = buf;
684 : 14820 : copybuf->len = len;
685 : 14820 : copybuf->cursor = 0;
686 : :
687 : 14820 : avail = copybuf->len - copybuf->cursor;
688 [ - + ]: 14820 : if (avail > maxread)
3279 peter_e@gmx.net 689 :UBC 0 : avail = maxread;
3279 peter_e@gmx.net 690 :CBC 14820 : memcpy(outbuf, ©buf->data[copybuf->cursor], avail);
472 peter@eisentraut.org 691 : 14820 : outbuf = (char *) outbuf + avail;
3279 peter_e@gmx.net 692 : 14820 : copybuf->cursor += avail;
693 : 14820 : maxread -= avail;
694 : 14820 : bytesread += avail;
695 : : }
696 : :
697 [ + - + - ]: 14820 : if (maxread <= 0 || bytesread >= minread)
698 : 14820 : return bytesread;
699 : : }
700 : :
701 : : /*
702 : : * Wait for more data or latch.
703 : : */
2669 tmunro@postgresql.or 704 :UBC 0 : (void) WaitLatchOrSocket(MyLatch,
705 : : WL_SOCKET_READABLE | WL_LATCH_SET |
706 : : WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
707 : : fd, 1000L, WAIT_EVENT_LOGICAL_SYNC_DATA);
708 : :
3204 andres@anarazel.de 709 : 0 : ResetLatch(MyLatch);
710 : : }
711 : :
3279 peter_e@gmx.net 712 : 0 : return bytesread;
713 : : }
714 : :
715 : :
716 : : /*
717 : : * Get information about remote relation in similar fashion the RELATION
718 : : * message provides during replication.
719 : : *
720 : : * This function also returns (a) the relation qualifications to be used in
721 : : * the COPY command, and (b) whether the remote relation has published any
722 : : * generated column.
723 : : */
724 : : static void
501 akapila@postgresql.o 725 :CBC 207 : fetch_remote_table_info(char *nspname, char *relname, LogicalRepRelation *lrel,
726 : : List **qual, bool *gencol_published)
727 : : {
728 : : WalRcvExecResult *res;
729 : : StringInfoData cmd;
730 : : TupleTableSlot *slot;
2187 peter@eisentraut.org 731 : 207 : Oid tableRow[] = {OIDOID, CHAROID, CHAROID};
501 akapila@postgresql.o 732 : 207 : Oid attrRow[] = {INT2OID, TEXTOID, OIDOID, BOOLOID, BOOLOID};
1482 733 : 207 : Oid qualRow[] = {TEXTOID};
734 : : bool isnull;
735 : : int natt;
506 michael@paquier.xyz 736 : 207 : StringInfo pub_names = NULL;
1450 tomas.vondra@postgre 737 : 207 : Bitmapset *included_cols = NULL;
501 akapila@postgresql.o 738 : 207 : int server_version = walrcv_server_version(LogRepWorkerWalRcvConn);
739 : :
3279 peter_e@gmx.net 740 : 207 : lrel->nspname = nspname;
741 : 207 : lrel->relname = relname;
742 : :
743 : : /* First fetch Oid and replica identity. */
744 : 207 : initStringInfo(&cmd);
2187 peter@eisentraut.org 745 : 207 : appendStringInfo(&cmd, "SELECT c.oid, c.relreplident, c.relkind"
746 : : " FROM pg_catalog.pg_class c"
747 : : " INNER JOIN pg_catalog.pg_namespace n"
748 : : " ON (c.relnamespace = n.oid)"
749 : : " WHERE n.nspname = %s"
750 : : " AND c.relname = %s",
751 : : quote_literal_cstr(nspname),
752 : : quote_literal_cstr(relname));
1768 alvherre@alvh.no-ip. 753 : 207 : res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
754 : : lengthof(tableRow), tableRow);
755 : :
3279 peter_e@gmx.net 756 [ - + ]: 207 : if (res->status != WALRCV_OK_TUPLES)
3279 peter_e@gmx.net 757 [ # # ]:UBC 0 : ereport(ERROR,
758 : : (errcode(ERRCODE_CONNECTION_FAILURE),
759 : : errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s",
760 : : nspname, relname, res->err)));
761 : :
2677 andres@anarazel.de 762 :CBC 207 : slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
3279 peter_e@gmx.net 763 [ - + ]: 207 : if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot))
3279 peter_e@gmx.net 764 [ # # ]:UBC 0 : ereport(ERROR,
765 : : (errcode(ERRCODE_UNDEFINED_OBJECT),
766 : : errmsg("table \"%s.%s\" not found on publisher",
767 : : nspname, relname)));
768 : :
3279 peter_e@gmx.net 769 :CBC 207 : lrel->remoteid = DatumGetObjectId(slot_getattr(slot, 1, &isnull));
770 [ - + ]: 207 : Assert(!isnull);
771 : 207 : lrel->replident = DatumGetChar(slot_getattr(slot, 2, &isnull));
772 [ - + ]: 207 : Assert(!isnull);
2187 peter@eisentraut.org 773 : 207 : lrel->relkind = DatumGetChar(slot_getattr(slot, 3, &isnull));
774 [ - + ]: 207 : Assert(!isnull);
775 : :
3279 peter_e@gmx.net 776 : 207 : ExecDropSingleTupleTableSlot(slot);
777 : 207 : walrcv_clear_result(res);
778 : :
779 : :
780 : : /*
781 : : * Get column lists for each relation.
782 : : *
783 : : * We need to do this before fetching info about column names and types,
784 : : * so that we can skip columns that should not be replicated.
785 : : */
501 akapila@postgresql.o 786 [ + - ]: 207 : if (server_version >= 150000)
787 : : {
788 : : WalRcvExecResult *pubres;
789 : : TupleTableSlot *tslot;
1382 790 : 207 : Oid attrsRow[] = {INT2VECTOROID};
791 : :
792 : : /* Build the pub_names comma-separated string. */
506 michael@paquier.xyz 793 : 207 : pub_names = makeStringInfo();
794 : 207 : GetPublicationsStr(MySubscription->publications, pub_names, true);
795 : :
796 : : /*
797 : : * Fetch info about column lists for the relation (from all the
798 : : * publications).
799 : : */
1450 tomas.vondra@postgre 800 : 207 : resetStringInfo(&cmd);
801 : 207 : appendStringInfo(&cmd,
802 : : "SELECT DISTINCT"
803 : : " (CASE WHEN (array_length(gpt.attrs, 1) = c.relnatts)"
804 : : " THEN NULL ELSE gpt.attrs END)"
805 : : " FROM pg_publication p,"
806 : : " LATERAL pg_get_publication_tables(p.pubname) gpt,"
807 : : " pg_class c"
808 : : " WHERE gpt.relid = %u AND c.oid = gpt.relid"
809 : : " AND p.pubname IN ( %s )",
810 : : lrel->remoteid,
811 : : pub_names->data);
812 : :
813 : 207 : pubres = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
814 : : lengthof(attrsRow), attrsRow);
815 : :
816 [ - + ]: 207 : if (pubres->status != WALRCV_OK_TUPLES)
1450 tomas.vondra@postgre 817 [ # # ]:UBC 0 : ereport(ERROR,
818 : : (errcode(ERRCODE_CONNECTION_FAILURE),
819 : : errmsg("could not fetch column list info for table \"%s.%s\" from publisher: %s",
820 : : nspname, relname, pubres->err)));
821 : :
822 : : /*
823 : : * We don't support the case where the column list is different for
824 : : * the same table when combining publications. See comments atop
825 : : * fetch_relation_list. So there should be only one row returned.
826 : : * Although we already checked this when creating the subscription, we
827 : : * still need to check here in case the column list was changed after
828 : : * creating the subscription and before the sync worker is started.
829 : : */
1382 akapila@postgresql.o 830 [ - + ]:CBC 207 : if (tuplestore_tuple_count(pubres->tuplestore) > 1)
1382 akapila@postgresql.o 831 [ # # ]:UBC 0 : ereport(ERROR,
832 : : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
833 : : errmsg("cannot use different column lists for table \"%s.%s\" in different publications",
834 : : nspname, relname));
835 : :
836 : : /*
837 : : * Get the column list and build a single bitmap with the attnums.
838 : : *
839 : : * If we find a NULL value, it means all the columns should be
840 : : * replicated.
841 : : */
1303 drowley@postgresql.o 842 :CBC 207 : tslot = MakeSingleTupleTableSlot(pubres->tupledesc, &TTSOpsMinimalTuple);
843 [ + - ]: 207 : if (tuplestore_gettupleslot(pubres->tuplestore, true, false, tslot))
844 : : {
845 : 207 : Datum cfval = slot_getattr(tslot, 1, &isnull);
846 : :
1382 akapila@postgresql.o 847 [ + + ]: 207 : if (!isnull)
848 : : {
849 : : ArrayType *arr;
850 : : int nelems;
851 : : int16 *elems;
852 : :
853 : 22 : arr = DatumGetArrayTypeP(cfval);
854 : 22 : nelems = ARR_DIMS(arr)[0];
855 [ - + ]: 22 : elems = (int16 *) ARR_DATA_PTR(arr);
856 : :
857 [ + + ]: 59 : for (natt = 0; natt < nelems; natt++)
858 : 37 : included_cols = bms_add_member(included_cols, elems[natt]);
859 : : }
860 : :
1303 drowley@postgresql.o 861 : 207 : ExecClearTuple(tslot);
862 : : }
863 : 207 : ExecDropSingleTupleTableSlot(tslot);
864 : :
1450 tomas.vondra@postgre 865 : 207 : walrcv_clear_result(pubres);
866 : : }
867 : :
868 : : /*
869 : : * Now fetch column names and types.
870 : : */
3279 peter_e@gmx.net 871 : 207 : resetStringInfo(&cmd);
338 drowley@postgresql.o 872 : 207 : appendStringInfoString(&cmd,
873 : : "SELECT a.attnum,"
874 : : " a.attname,"
875 : : " a.atttypid,"
876 : : " a.attnum = ANY(i.indkey)");
877 : :
878 : : /* Generated columns can be replicated since version 18. */
501 akapila@postgresql.o 879 [ + - ]: 207 : if (server_version >= 180000)
338 drowley@postgresql.o 880 : 207 : appendStringInfoString(&cmd, ", a.attgenerated != ''");
881 : :
501 akapila@postgresql.o 882 [ + - ]: 414 : appendStringInfo(&cmd,
883 : : " FROM pg_catalog.pg_attribute a"
884 : : " LEFT JOIN pg_catalog.pg_index i"
885 : : " ON (i.indexrelid = pg_get_replica_identity_index(%u))"
886 : : " WHERE a.attnum > 0::pg_catalog.int2"
887 : : " AND NOT a.attisdropped %s"
888 : : " AND a.attrelid = %u"
889 : : " ORDER BY a.attnum",
890 : : lrel->remoteid,
891 [ - + ]: 207 : (server_version >= 120000 && server_version < 180000 ?
892 : : "AND a.attgenerated = ''" : ""),
893 : : lrel->remoteid);
1768 alvherre@alvh.no-ip. 894 [ + - ]: 207 : res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
895 : : server_version >= 180000 ? lengthof(attrRow) : lengthof(attrRow) - 1, attrRow);
896 : :
3279 peter_e@gmx.net 897 [ - + ]: 206 : if (res->status != WALRCV_OK_TUPLES)
3279 peter_e@gmx.net 898 [ # # ]:UBC 0 : ereport(ERROR,
899 : : (errcode(ERRCODE_CONNECTION_FAILURE),
900 : : errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s",
901 : : nspname, relname, res->err)));
902 : :
903 : : /* We don't know the number of rows coming, so allocate enough space. */
9 msawada@postgresql.o 904 :GNC 206 : lrel->attnames = palloc0_array(char *, MaxTupleAttributeNumber);
905 : 206 : lrel->atttyps = palloc0_array(Oid, MaxTupleAttributeNumber);
3279 peter_e@gmx.net 906 :CBC 206 : lrel->attkeys = NULL;
907 : :
908 : : /*
909 : : * Store the columns as a list of names. Ignore those that are not
910 : : * present in the column list, if there is one.
911 : : */
912 : 206 : natt = 0;
2677 andres@anarazel.de 913 : 206 : slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
3279 peter_e@gmx.net 914 [ + + ]: 581 : while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
915 : : {
916 : : char *rel_colname;
917 : : AttrNumber attnum;
918 : :
1450 tomas.vondra@postgre 919 : 375 : attnum = DatumGetInt16(slot_getattr(slot, 1, &isnull));
920 [ - + ]: 375 : Assert(!isnull);
921 : :
922 : : /* If the column is not in the column list, skip it. */
923 [ + + + + ]: 375 : if (included_cols != NULL && !bms_is_member(attnum, included_cols))
924 : : {
925 : 31 : ExecClearTuple(slot);
926 : 31 : continue;
927 : : }
928 : :
929 : 344 : rel_colname = TextDatumGetCString(slot_getattr(slot, 2, &isnull));
3279 peter_e@gmx.net 930 [ - + ]: 344 : Assert(!isnull);
931 : :
1450 tomas.vondra@postgre 932 : 344 : lrel->attnames[natt] = rel_colname;
933 : 344 : lrel->atttyps[natt] = DatumGetObjectId(slot_getattr(slot, 3, &isnull));
3279 peter_e@gmx.net 934 [ - + ]: 344 : Assert(!isnull);
935 : :
1450 tomas.vondra@postgre 936 [ + + ]: 344 : if (DatumGetBool(slot_getattr(slot, 4, &isnull)))
3279 peter_e@gmx.net 937 : 110 : lrel->attkeys = bms_add_member(lrel->attkeys, natt);
938 : :
939 : : /* Remember if the remote table has published any generated column. */
501 akapila@postgresql.o 940 [ + - + - ]: 344 : if (server_version >= 180000 && !(*gencol_published))
941 : : {
942 : 344 : *gencol_published = DatumGetBool(slot_getattr(slot, 5, &isnull));
943 [ - + ]: 344 : Assert(!isnull);
944 : : }
945 : :
946 : : /* Should never happen. */
3279 peter_e@gmx.net 947 [ - + ]: 344 : if (++natt >= MaxTupleAttributeNumber)
3279 peter_e@gmx.net 948 [ # # ]:UBC 0 : elog(ERROR, "too many columns in remote table \"%s.%s\"",
949 : : nspname, relname);
950 : :
3279 peter_e@gmx.net 951 :CBC 344 : ExecClearTuple(slot);
952 : : }
953 : 206 : ExecDropSingleTupleTableSlot(slot);
954 : :
955 : 206 : lrel->natts = natt;
956 : :
957 : 206 : walrcv_clear_result(res);
958 : :
959 : : /*
960 : : * Get relation's row filter expressions. DISTINCT avoids the same
961 : : * expression of a table in multiple publications from being included
962 : : * multiple times in the final expression.
963 : : *
964 : : * We need to copy the row even if it matches just one of the
965 : : * publications, so we later combine all the quals with OR.
966 : : *
967 : : * For initial synchronization, row filtering can be ignored in following
968 : : * cases:
969 : : *
970 : : * 1) one of the subscribed publications for the table hasn't specified
971 : : * any row filter
972 : : *
973 : : * 2) one of the subscribed publications has puballtables set to true
974 : : *
975 : : * 3) one of the subscribed publications is declared as TABLES IN SCHEMA
976 : : * that includes this relation
977 : : */
501 akapila@postgresql.o 978 [ + - ]: 206 : if (server_version >= 150000)
979 : : {
980 : : /* Reuse the already-built pub_names. */
506 michael@paquier.xyz 981 [ - + ]: 206 : Assert(pub_names != NULL);
982 : :
983 : : /* Check for row filters. */
1482 akapila@postgresql.o 984 : 206 : resetStringInfo(&cmd);
985 : 206 : appendStringInfo(&cmd,
986 : : "SELECT DISTINCT pg_get_expr(gpt.qual, gpt.relid)"
987 : : " FROM pg_publication p,"
988 : : " LATERAL pg_get_publication_tables(p.pubname) gpt"
989 : : " WHERE gpt.relid = %u"
990 : : " AND p.pubname IN ( %s )",
991 : : lrel->remoteid,
992 : : pub_names->data);
993 : :
994 : 206 : res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 1, qualRow);
995 : :
996 [ - + ]: 206 : if (res->status != WALRCV_OK_TUPLES)
1482 akapila@postgresql.o 997 [ # # ]:UBC 0 : ereport(ERROR,
998 : : (errmsg("could not fetch table WHERE clause info for table \"%s.%s\" from publisher: %s",
999 : : nspname, relname, res->err)));
1000 : :
1001 : : /*
1002 : : * Multiple row filter expressions for the same table will be combined
1003 : : * by COPY using OR. If any of the filter expressions for this table
1004 : : * are null, it means the whole table will be copied. In this case it
1005 : : * is not necessary to construct a unified row filter expression at
1006 : : * all.
1007 : : */
1482 akapila@postgresql.o 1008 :CBC 206 : slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
1009 [ + + ]: 221 : while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
1010 : : {
1011 : 210 : Datum rf = slot_getattr(slot, 1, &isnull);
1012 : :
1013 [ + + ]: 210 : if (!isnull)
1014 : 15 : *qual = lappend(*qual, makeString(TextDatumGetCString(rf)));
1015 : : else
1016 : : {
1017 : : /* Ignore filters and cleanup as necessary. */
1018 [ + + ]: 195 : if (*qual)
1019 : : {
1020 : 3 : list_free_deep(*qual);
1021 : 3 : *qual = NIL;
1022 : : }
1023 : 195 : break;
1024 : : }
1025 : :
1026 : 15 : ExecClearTuple(slot);
1027 : : }
1028 : 206 : ExecDropSingleTupleTableSlot(slot);
1029 : :
1030 : 206 : walrcv_clear_result(res);
506 michael@paquier.xyz 1031 : 206 : destroyStringInfo(pub_names);
1032 : : }
1033 : :
3279 peter_e@gmx.net 1034 : 206 : pfree(cmd.data);
1035 : 206 : }
1036 : :
1037 : : /*
1038 : : * Copy existing data of a table from publisher.
1039 : : *
1040 : : * Caller is responsible for locking the local relation.
1041 : : */
1042 : : static void
1043 : 207 : copy_table(Relation rel)
1044 : : {
1045 : : LogicalRepRelMapEntry *relmapentry;
1046 : : LogicalRepRelation lrel;
1482 akapila@postgresql.o 1047 : 207 : List *qual = NIL;
1048 : : WalRcvExecResult *res;
1049 : : StringInfoData cmd;
1050 : : CopyFromState cstate;
1051 : : List *attnamelist;
1052 : : ParseState *pstate;
1088 1053 : 207 : List *options = NIL;
501 1054 : 207 : bool gencol_published = false;
1055 : :
1056 : : /* Get the publisher relation info. */
3279 peter_e@gmx.net 1057 : 207 : fetch_remote_table_info(get_namespace_name(RelationGetNamespace(rel)),
501 akapila@postgresql.o 1058 : 207 : RelationGetRelationName(rel), &lrel, &qual,
1059 : : &gencol_published);
1060 : :
1061 : : /* Put the relation into relmap. */
3279 peter_e@gmx.net 1062 : 206 : logicalrep_relmap_update(&lrel);
1063 : :
1064 : : /* Map the publisher relation to local one. */
1065 : 206 : relmapentry = logicalrep_rel_open(lrel.remoteid, NoLock);
1066 [ - + ]: 204 : Assert(rel == relmapentry->localrel);
1067 : :
1068 : : /* Start copy on the publisher. */
1069 : 204 : initStringInfo(&cmd);
1070 : :
1071 : : /* Regular or partitioned table with no row filter or generated columns */
115 msawada@postgresql.o 1072 [ + + + - ]:GNC 204 : if ((lrel.relkind == RELKIND_RELATION || lrel.relkind == RELKIND_PARTITIONED_TABLE)
1073 [ + + + + ]: 204 : && qual == NIL && !gencol_published)
1074 : : {
844 akapila@postgresql.o 1075 :CBC 190 : appendStringInfo(&cmd, "COPY %s",
2187 peter@eisentraut.org 1076 : 190 : quote_qualified_identifier(lrel.nspname, lrel.relname));
1077 : :
1078 : : /* If the table has columns, then specify the columns */
844 akapila@postgresql.o 1079 [ + + ]: 190 : if (lrel.natts)
1080 : : {
1081 : 189 : appendStringInfoString(&cmd, " (");
1082 : :
1083 : : /*
1084 : : * XXX Do we need to list the columns in all cases? Maybe we're
1085 : : * replicating all columns?
1086 : : */
1087 [ + + ]: 506 : for (int i = 0; i < lrel.natts; i++)
1088 : : {
1089 [ + + ]: 317 : if (i > 0)
1090 : 128 : appendStringInfoString(&cmd, ", ");
1091 : :
1092 : 317 : appendStringInfoString(&cmd, quote_identifier(lrel.attnames[i]));
1093 : : }
1094 : :
704 drowley@postgresql.o 1095 : 189 : appendStringInfoChar(&cmd, ')');
1096 : : }
1097 : :
844 akapila@postgresql.o 1098 : 190 : appendStringInfoString(&cmd, " TO STDOUT");
1099 : : }
1100 : : else
1101 : : {
1102 : : /*
1103 : : * For non-tables and tables with row filters, we need to do COPY
1104 : : * (SELECT ...), but we can't just do SELECT * because we may need to
1105 : : * copy only subset of columns including generated columns. For tables
1106 : : * with any row filters, build a SELECT query with OR'ed row filters
1107 : : * for COPY.
1108 : : *
1109 : : * We also need to use this same COPY (SELECT ...) syntax when
1110 : : * generated columns are published, because copy of generated columns
1111 : : * is not supported by the normal COPY.
1112 : : */
1977 drowley@postgresql.o 1113 : 14 : appendStringInfoString(&cmd, "COPY (SELECT ");
2187 peter@eisentraut.org 1114 [ + + ]: 35 : for (int i = 0; i < lrel.natts; i++)
1115 : : {
1116 : 21 : appendStringInfoString(&cmd, quote_identifier(lrel.attnames[i]));
1117 [ + + ]: 21 : if (i < lrel.natts - 1)
1118 : 7 : appendStringInfoString(&cmd, ", ");
1119 : : }
1120 : :
1482 akapila@postgresql.o 1121 : 14 : appendStringInfoString(&cmd, " FROM ");
1122 : :
1123 : : /*
1124 : : * For regular tables, make sure we don't copy data from a child that
1125 : : * inherits the named table as those will be copied separately.
1126 : : */
1127 [ + + ]: 14 : if (lrel.relkind == RELKIND_RELATION)
1128 : 11 : appendStringInfoString(&cmd, "ONLY ");
1129 : :
1130 : 14 : appendStringInfoString(&cmd, quote_qualified_identifier(lrel.nspname, lrel.relname));
1131 : : /* list of OR'ed filters */
1132 [ + + ]: 14 : if (qual != NIL)
1133 : : {
1134 : : ListCell *lc;
1135 : 11 : char *q = strVal(linitial(qual));
1136 : :
1137 : 11 : appendStringInfo(&cmd, " WHERE %s", q);
1138 [ + - + + : 12 : for_each_from(lc, qual, 1)
+ + ]
1139 : : {
1140 : 1 : q = strVal(lfirst(lc));
1141 : 1 : appendStringInfo(&cmd, " OR %s", q);
1142 : : }
1143 : 11 : list_free_deep(qual);
1144 : : }
1145 : :
1146 : 14 : appendStringInfoString(&cmd, ") TO STDOUT");
1147 : : }
1148 : :
1149 : : /*
1150 : : * Prior to v16, initial table synchronization will use text format even
1151 : : * if the binary option is enabled for a subscription.
1152 : : */
1088 1153 [ + - ]: 204 : if (walrcv_server_version(LogRepWorkerWalRcvConn) >= 160000 &&
1154 [ + + ]: 204 : MySubscription->binary)
1155 : : {
1156 : 5 : appendStringInfoString(&cmd, " WITH (FORMAT binary)");
1157 : 5 : options = list_make1(makeDefElem("format",
1158 : : (Node *) makeString("binary"), -1));
1159 : : }
1160 : :
1768 alvherre@alvh.no-ip. 1161 : 204 : res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 0, NULL);
3279 peter_e@gmx.net 1162 : 204 : pfree(cmd.data);
1163 [ - + ]: 204 : if (res->status != WALRCV_OK_COPY_OUT)
3279 peter_e@gmx.net 1164 [ # # ]:UBC 0 : ereport(ERROR,
1165 : : (errcode(ERRCODE_CONNECTION_FAILURE),
1166 : : errmsg("could not start initial contents copy for table \"%s.%s\": %s",
1167 : : lrel.nspname, lrel.relname, res->err)));
3279 peter_e@gmx.net 1168 :CBC 204 : walrcv_clear_result(res);
1169 : :
1170 : 204 : copybuf = makeStringInfo();
1171 : :
3254 1172 : 204 : pstate = make_parsestate(NULL);
2264 tgl@sss.pgh.pa.us 1173 : 204 : (void) addRangeTableEntryForRelation(pstate, rel, AccessShareLock,
1174 : : NULL, false, false);
1175 : :
3279 peter_e@gmx.net 1176 : 204 : attnamelist = make_copy_attnamelist(relmapentry);
1088 akapila@postgresql.o 1177 : 204 : cstate = BeginCopyFrom(pstate, rel, NULL, NULL, false, copy_read_data, attnamelist, options);
1178 : :
1179 : : /* Do the copy */
3279 peter_e@gmx.net 1180 : 203 : (void) CopyFrom(cstate);
1181 : :
1182 : 193 : logicalrep_rel_close(relmapentry, NoLock);
1183 : 193 : }
1184 : :
1185 : : /*
1186 : : * Determine the tablesync slot name.
1187 : : *
1188 : : * The name must not exceed NAMEDATALEN - 1 because of remote node constraints
1189 : : * on slot name length. We append system_identifier to avoid slot_name
1190 : : * collision with subscriptions in other clusters. With the current scheme
1191 : : * pg_%u_sync_%u_UINT64_FORMAT (3 + 10 + 6 + 10 + 20 + '\0'), the maximum
1192 : : * length of slot_name will be 50.
1193 : : *
1194 : : * The returned slot name is stored in the supplied buffer (syncslotname) with
1195 : : * the given size.
1196 : : *
1197 : : * Note: We don't use the subscription slot name as part of tablesync slot name
1198 : : * because we are responsible for cleaning up these slots and it could become
1199 : : * impossible to recalculate what name to cleanup if the subscription slot name
1200 : : * had changed.
1201 : : */
1202 : : void
1857 akapila@postgresql.o 1203 : 405 : ReplicationSlotNameForTablesync(Oid suboid, Oid relid,
1204 : : char *syncslotname, Size szslot)
1205 : : {
1854 1206 : 405 : snprintf(syncslotname, szslot, "pg_%u_sync_%u_" UINT64_FORMAT, suboid,
1207 : : relid, GetSystemIdentifier());
1857 1208 : 405 : }
1209 : :
1210 : : /*
1211 : : * Start syncing the table in the sync worker.
1212 : : *
1213 : : * If nothing needs to be done to sync the table, we exit the worker without
1214 : : * any further action.
1215 : : *
1216 : : * The returned slot name is palloc'ed in current memory context.
1217 : : */
1218 : : static char *
3279 peter_e@gmx.net 1219 : 207 : LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
1220 : : {
1221 : : char *slotname;
1222 : : char *err;
1223 : : char relstate;
1224 : : XLogRecPtr relstate_lsn;
1225 : : Relation rel;
1226 : : AclResult aclresult;
1227 : : WalRcvExecResult *res;
1228 : : char originname[NAMEDATALEN];
1229 : : ReplOriginId originid;
1230 : : UserContext ucxt;
1231 : : bool must_use_password;
1232 : : bool run_as_owner;
1233 : :
1234 : : /* Check the state of the table synchronization. */
1235 : 207 : StartTransactionCommand();
3251 fujii@postgresql.org 1236 : 207 : relstate = GetSubscriptionRelState(MyLogicalRepWorker->subid,
1237 : 207 : MyLogicalRepWorker->relid,
1238 : : &relstate_lsn);
880 akapila@postgresql.o 1239 : 207 : CommitTransactionCommand();
1240 : :
1241 : : /* Is the use of a password mandatory? */
1004 1242 [ + + ]: 409 : must_use_password = MySubscription->passwordrequired &&
880 1243 [ - + ]: 202 : !MySubscription->ownersuperuser;
1244 : :
3279 peter_e@gmx.net 1245 [ - + ]: 207 : SpinLockAcquire(&MyLogicalRepWorker->relmutex);
3251 fujii@postgresql.org 1246 : 207 : MyLogicalRepWorker->relstate = relstate;
1247 : 207 : MyLogicalRepWorker->relstate_lsn = relstate_lsn;
3279 peter_e@gmx.net 1248 : 207 : SpinLockRelease(&MyLogicalRepWorker->relmutex);
1249 : :
1250 : : /*
1251 : : * If synchronization is already done or no longer necessary, exit now
1252 : : * that we've updated shared memory state.
1253 : : */
1977 alvherre@alvh.no-ip. 1254 [ - + ]: 207 : switch (relstate)
1255 : : {
1977 alvherre@alvh.no-ip. 1256 :UBC 0 : case SUBREL_STATE_SYNCDONE:
1257 : : case SUBREL_STATE_READY:
1258 : : case SUBREL_STATE_UNKNOWN:
150 akapila@postgresql.o 1259 :UNC 0 : FinishSyncWorker(); /* doesn't return */
1260 : : }
1261 : :
1262 : : /* Calculate the name of the tablesync slot. */
1854 akapila@postgresql.o 1263 :CBC 207 : slotname = (char *) palloc(NAMEDATALEN);
1264 : 207 : ReplicationSlotNameForTablesync(MySubscription->oid,
1265 : 207 : MyLogicalRepWorker->relid,
1266 : : slotname,
1267 : : NAMEDATALEN);
1268 : :
1269 : : /*
1270 : : * Here we use the slot name instead of the subscription name as the
1271 : : * application_name, so that it is different from the leader apply worker,
1272 : : * so that synchronous replication can distinguish them.
1273 : : */
1768 alvherre@alvh.no-ip. 1274 : 207 : LogRepWorkerWalRcvConn =
769 akapila@postgresql.o 1275 : 207 : walrcv_connect(MySubscription->conninfo, true, true,
1276 : : must_use_password,
1277 : : slotname, &err);
1768 alvherre@alvh.no-ip. 1278 [ - + ]: 207 : if (LogRepWorkerWalRcvConn == NULL)
3279 peter_e@gmx.net 1279 [ # # ]:UBC 0 : ereport(ERROR,
1280 : : (errcode(ERRCODE_CONNECTION_FAILURE),
1281 : : errmsg("table synchronization worker for subscription \"%s\" could not connect to the publisher: %s",
1282 : : MySubscription->name, err)));
1283 : :
1977 alvherre@alvh.no-ip. 1284 [ + + - + :CBC 207 : Assert(MyLogicalRepWorker->relstate == SUBREL_STATE_INIT ||
- - ]
1285 : : MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC ||
1286 : : MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY);
1287 : :
1288 : : /* Assign the origin tracking record name. */
1251 akapila@postgresql.o 1289 : 207 : ReplicationOriginNameForLogicalRep(MySubscription->oid,
1290 : 207 : MyLogicalRepWorker->relid,
1291 : : originname,
1292 : : sizeof(originname));
1293 : :
1857 1294 [ + + ]: 207 : if (MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC)
1295 : : {
1296 : : /*
1297 : : * We have previously errored out before finishing the copy so the
1298 : : * replication slot might exist. We want to remove the slot if it
1299 : : * already exists and proceed.
1300 : : *
1301 : : * XXX We could also instead try to drop the slot, last time we failed
1302 : : * but for that, we might need to clean up the copy state as it might
1303 : : * be in the middle of fetching the rows. Also, if there is a network
1304 : : * breakdown then it wouldn't have succeeded so trying it next time
1305 : : * seems like a better bet.
1306 : : */
1768 alvherre@alvh.no-ip. 1307 : 9 : ReplicationSlotDropAtPubNode(LogRepWorkerWalRcvConn, slotname, true);
1308 : : }
1857 akapila@postgresql.o 1309 [ - + ]: 198 : else if (MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY)
1310 : : {
1311 : : /*
1312 : : * The COPY phase was previously done, but tablesync then crashed
1313 : : * before it was able to finish normally.
1314 : : */
1857 akapila@postgresql.o 1315 :UBC 0 : StartTransactionCommand();
1316 : :
1317 : : /*
1318 : : * The origin tracking name must already exist. It was created first
1319 : : * time this tablesync was launched.
1320 : : */
1321 : 0 : originid = replorigin_by_name(originname, false);
1161 1322 : 0 : replorigin_session_setup(originid, 0);
46 msawada@postgresql.o 1323 :UNC 0 : replorigin_xact_state.origin = originid;
1857 akapila@postgresql.o 1324 :UBC 0 : *origin_startpos = replorigin_session_get_progress(false);
1325 : :
1326 : 0 : CommitTransactionCommand();
1327 : :
1328 : 0 : goto copy_table_done;
1329 : : }
1330 : :
1977 alvherre@alvh.no-ip. 1331 [ - + ]:CBC 207 : SpinLockAcquire(&MyLogicalRepWorker->relmutex);
1332 : 207 : MyLogicalRepWorker->relstate = SUBREL_STATE_DATASYNC;
1333 : 207 : MyLogicalRepWorker->relstate_lsn = InvalidXLogRecPtr;
1334 : 207 : SpinLockRelease(&MyLogicalRepWorker->relmutex);
1335 : :
1336 : : /*
1337 : : * Update the state, create the replication origin, and make them visible
1338 : : * to others.
1339 : : */
1340 : 207 : StartTransactionCommand();
1341 : 207 : UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
1342 : 207 : MyLogicalRepWorker->relid,
1343 : 207 : MyLogicalRepWorker->relstate,
226 akapila@postgresql.o 1344 : 207 : MyLogicalRepWorker->relstate_lsn,
1345 : : false);
1346 : :
1347 : : /*
1348 : : * Create the replication origin in a separate transaction from the one
1349 : : * that sets up the origin in shared memory. This prevents the risk that
1350 : : * changes to the origin in shared memory cannot be rolled back if the
1351 : : * transaction aborts.
1352 : : */
82 michael@paquier.xyz 1353 : 207 : originid = replorigin_by_name(originname, true);
1354 [ + + ]: 207 : if (!OidIsValid(originid))
1355 : 198 : originid = replorigin_create(originname);
1356 : :
1977 alvherre@alvh.no-ip. 1357 : 207 : CommitTransactionCommand();
1439 andres@anarazel.de 1358 : 207 : pgstat_report_stat(true);
1359 : :
1977 alvherre@alvh.no-ip. 1360 : 207 : StartTransactionCommand();
1361 : :
1362 : : /*
1363 : : * Use a standard write lock here. It might be better to disallow access
1364 : : * to the table while it's being synchronized. But we don't want to block
1365 : : * the main apply process from working and it has to open the relation in
1366 : : * RowExclusiveLock when remapping remote relation id to local one.
1367 : : */
1368 : 207 : rel = table_open(MyLogicalRepWorker->relid, RowExclusiveLock);
1369 : :
1370 : : /*
1371 : : * Start a transaction in the remote node in REPEATABLE READ mode. This
1372 : : * ensures that both the replication slot we create (see below) and the
1373 : : * COPY are consistent with each other.
1374 : : */
1768 1375 : 207 : res = walrcv_exec(LogRepWorkerWalRcvConn,
1376 : : "BEGIN READ ONLY ISOLATION LEVEL REPEATABLE READ",
1377 : : 0, NULL);
1977 1378 [ - + ]: 207 : if (res->status != WALRCV_OK_COMMAND)
1977 alvherre@alvh.no-ip. 1379 [ # # ]:UBC 0 : ereport(ERROR,
1380 : : (errcode(ERRCODE_CONNECTION_FAILURE),
1381 : : errmsg("table copy could not start transaction on publisher: %s",
1382 : : res->err)));
1977 alvherre@alvh.no-ip. 1383 :CBC 207 : walrcv_clear_result(res);
1384 : :
1385 : : /*
1386 : : * Create a new permanent logical decoding slot. This slot will be used
1387 : : * for the catchup phase after COPY is done, so tell it to use the
1388 : : * snapshot to make the final data consistent.
1389 : : */
1705 akapila@postgresql.o 1390 : 207 : walrcv_create_slot(LogRepWorkerWalRcvConn,
1391 : : slotname, false /* permanent */ , false /* two_phase */ ,
1392 : : MySubscription->failover,
1393 : : CRS_USE_SNAPSHOT, origin_startpos);
1394 : :
1395 : : /*
1396 : : * Advance the origin to the LSN got from walrcv_create_slot and then set
1397 : : * up the origin. The advancement is WAL logged for the purpose of
1398 : : * recovery. Locks are to prevent the replication origin from vanishing
1399 : : * while advancing.
1400 : : *
1401 : : * The purpose of doing these before the copy is to avoid doing the copy
1402 : : * again due to any error in advancing or setting up origin tracking.
1403 : : */
82 michael@paquier.xyz 1404 : 207 : LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1405 : 207 : replorigin_advance(originid, *origin_startpos, InvalidXLogRecPtr,
1406 : : true /* go backward */ , true /* WAL log */ );
1407 : 207 : UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1408 : :
1409 : 207 : replorigin_session_setup(originid, 0);
46 msawada@postgresql.o 1410 :GNC 207 : replorigin_xact_state.origin = originid;
1411 : :
1412 : : /*
1413 : : * If the user did not opt to run as the owner of the subscription
1414 : : * ('run_as_owner'), then copy the table as the owner of the table.
1415 : : */
1010 msawada@postgresql.o 1416 :CBC 207 : run_as_owner = MySubscription->runasowner;
1417 [ + + ]: 207 : if (!run_as_owner)
1418 : 206 : SwitchToUntrustedUser(rel->rd_rel->relowner, &ucxt);
1419 : :
1420 : : /*
1421 : : * Check that our table sync worker has permission to insert into the
1422 : : * target table.
1423 : : */
1424 : 207 : aclresult = pg_class_aclcheck(RelationGetRelid(rel), GetUserId(),
1425 : : ACL_INSERT);
1426 [ - + ]: 207 : if (aclresult != ACLCHECK_OK)
1010 msawada@postgresql.o 1427 :UBC 0 : aclcheck_error(aclresult,
1428 : 0 : get_relkind_objtype(rel->rd_rel->relkind),
1429 : 0 : RelationGetRelationName(rel));
1430 : :
1431 : : /*
1432 : : * COPY FROM does not honor RLS policies. That is not a problem for
1433 : : * subscriptions owned by roles with BYPASSRLS privilege (or superuser,
1434 : : * who has it implicitly), but other roles should not be able to
1435 : : * circumvent RLS. Disallow logical replication into RLS enabled
1436 : : * relations for such roles.
1437 : : */
1010 msawada@postgresql.o 1438 [ - + ]:CBC 207 : if (check_enable_rls(RelationGetRelid(rel), InvalidOid, false) == RLS_ENABLED)
1010 msawada@postgresql.o 1439 [ # # ]:UBC 0 : ereport(ERROR,
1440 : : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1441 : : errmsg("user \"%s\" cannot replicate into relation with row-level security enabled: \"%s\"",
1442 : : GetUserNameFromId(GetUserId(), true),
1443 : : RelationGetRelationName(rel))));
1444 : :
1445 : : /* Now do the initial data copy */
1438 tomas.vondra@postgre 1446 :CBC 207 : PushActiveSnapshot(GetTransactionSnapshot());
1447 : 207 : copy_table(rel);
1448 : 193 : PopActiveSnapshot();
1449 : :
1768 alvherre@alvh.no-ip. 1450 : 193 : res = walrcv_exec(LogRepWorkerWalRcvConn, "COMMIT", 0, NULL);
1977 1451 [ - + ]: 193 : if (res->status != WALRCV_OK_COMMAND)
1977 alvherre@alvh.no-ip. 1452 [ # # ]:UBC 0 : ereport(ERROR,
1453 : : (errcode(ERRCODE_CONNECTION_FAILURE),
1454 : : errmsg("table copy could not finish transaction on publisher: %s",
1455 : : res->err)));
1977 alvherre@alvh.no-ip. 1456 :CBC 193 : walrcv_clear_result(res);
1457 : :
999 tgl@sss.pgh.pa.us 1458 [ + + ]: 193 : if (!run_as_owner)
1010 msawada@postgresql.o 1459 : 192 : RestoreUserContext(&ucxt);
1460 : :
1977 alvherre@alvh.no-ip. 1461 : 193 : table_close(rel, NoLock);
1462 : :
1463 : : /* Make the copy visible. */
1464 : 193 : CommandCounterIncrement();
1465 : :
1466 : : /*
1467 : : * Update the persisted state to indicate the COPY phase is done; make it
1468 : : * visible to others.
1469 : : */
1857 akapila@postgresql.o 1470 : 193 : UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
1471 : 193 : MyLogicalRepWorker->relid,
1472 : : SUBREL_STATE_FINISHEDCOPY,
226 1473 : 193 : MyLogicalRepWorker->relstate_lsn,
1474 : : false);
1475 : :
1857 1476 : 193 : CommitTransactionCommand();
1477 : :
1478 : 193 : copy_table_done:
1479 : :
1480 [ - + ]: 193 : elog(DEBUG1,
1481 : : "LogicalRepSyncTableStart: '%s' origin_startpos lsn %X/%08X",
1482 : : originname, LSN_FORMAT_ARGS(*origin_startpos));
1483 : :
1484 : : /*
1485 : : * We are done with the initial data synchronization, update the state.
1486 : : */
1977 alvherre@alvh.no-ip. 1487 [ - + ]: 193 : SpinLockAcquire(&MyLogicalRepWorker->relmutex);
1488 : 193 : MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCWAIT;
1489 : 193 : MyLogicalRepWorker->relstate_lsn = *origin_startpos;
1490 : 193 : SpinLockRelease(&MyLogicalRepWorker->relmutex);
1491 : :
1492 : : /*
1493 : : * Finally, wait until the leader apply worker tells us to catch up and
1494 : : * then return to let LogicalRepApplyLoop do it.
1495 : : */
1496 : 193 : wait_for_worker_state_change(SUBREL_STATE_CATCHUP);
3279 peter_e@gmx.net 1497 : 193 : return slotname;
1498 : : }
1499 : :
1500 : : /*
1501 : : * Execute the initial sync with error handling. Disable the subscription,
1502 : : * if it's required.
1503 : : *
1504 : : * Allocate the slot name in long-lived context on return. Note that we don't
1505 : : * handle FATAL errors which are probably because of system resource error and
1506 : : * are not repeatable.
1507 : : */
1508 : : static void
955 akapila@postgresql.o 1509 : 207 : start_table_sync(XLogRecPtr *origin_startpos, char **slotname)
1510 : : {
1511 : 207 : char *sync_slotname = NULL;
1512 : :
1513 [ - + ]: 207 : Assert(am_tablesync_worker());
1514 : :
1515 [ + + ]: 207 : PG_TRY();
1516 : : {
1517 : : /* Call initial sync. */
1518 : 207 : sync_slotname = LogicalRepSyncTableStart(origin_startpos);
1519 : : }
1520 : 13 : PG_CATCH();
1521 : : {
1522 [ + + ]: 13 : if (MySubscription->disableonerr)
1523 : 1 : DisableSubscriptionAndExit();
1524 : : else
1525 : : {
1526 : : /*
1527 : : * Report the worker failed during table synchronization. Abort
1528 : : * the current transaction so that the stats message is sent in an
1529 : : * idle state.
1530 : : */
1531 : 12 : AbortOutOfAnyTransaction();
23 akapila@postgresql.o 1532 :GNC 12 : pgstat_report_subscription_error(MySubscription->oid);
1533 : :
955 akapila@postgresql.o 1534 :CBC 12 : PG_RE_THROW();
1535 : : }
1536 : : }
1537 [ - + ]: 193 : PG_END_TRY();
1538 : :
1539 : : /* allocate slot name in long-lived context */
1540 : 193 : *slotname = MemoryContextStrdup(ApplyContext, sync_slotname);
1541 : 193 : pfree(sync_slotname);
1542 : 193 : }
1543 : :
1544 : : /*
1545 : : * Runs the tablesync worker.
1546 : : *
1547 : : * It starts syncing tables. After a successful sync, sets streaming options
1548 : : * and starts streaming to catchup with apply worker.
1549 : : */
1550 : : static void
102 nathan@postgresql.or 1551 :GNC 207 : run_tablesync_worker(void)
1552 : : {
1553 : : char originname[NAMEDATALEN];
955 akapila@postgresql.o 1554 :CBC 207 : XLogRecPtr origin_startpos = InvalidXLogRecPtr;
1555 : 207 : char *slotname = NULL;
1556 : : WalRcvStreamOptions options;
1557 : :
1558 : 207 : start_table_sync(&origin_startpos, &slotname);
1559 : :
1560 : 193 : ReplicationOriginNameForLogicalRep(MySubscription->oid,
1561 : 193 : MyLogicalRepWorker->relid,
1562 : : originname,
1563 : : sizeof(originname));
1564 : :
1565 : 193 : set_apply_error_context_origin(originname);
1566 : :
1567 : 193 : set_stream_options(&options, slotname, &origin_startpos);
1568 : :
1569 : 193 : walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
1570 : :
1571 : : /* Apply the changes till we catchup with the apply worker. */
1572 : 193 : start_apply(origin_startpos);
955 akapila@postgresql.o 1573 :UBC 0 : }
1574 : :
1575 : : /* Logical Replication Tablesync worker entry point */
1576 : : void
130 akapila@postgresql.o 1577 :GNC 208 : TableSyncWorkerMain(Datum main_arg)
1578 : : {
955 akapila@postgresql.o 1579 :CBC 208 : int worker_slot = DatumGetInt32(main_arg);
1580 : :
1581 : 208 : SetupApplyOrSyncWorker(worker_slot);
1582 : :
1583 : 207 : run_tablesync_worker();
1584 : :
150 akapila@postgresql.o 1585 :UNC 0 : FinishSyncWorker();
1586 : : }
1587 : :
1588 : : /*
1589 : : * If the subscription has no tables then return false.
1590 : : *
1591 : : * Otherwise, are all tablesyncs READY?
1592 : : *
1593 : : * Note: This function is not suitable to be called from outside of apply or
1594 : : * tablesync workers because MySubscription needs to be already initialized.
1595 : : */
1596 : : bool
1705 akapila@postgresql.o 1597 :CBC 201 : AllTablesyncsReady(void)
1598 : : {
1599 : : bool started_tx;
1600 : : bool has_tables;
1601 : :
1602 : : /* We need up-to-date sync state info for subscription tables here. */
130 akapila@postgresql.o 1603 :GNC 201 : FetchRelationStates(&has_tables, NULL, &started_tx);
1604 : :
1705 akapila@postgresql.o 1605 [ + + ]:CBC 201 : if (started_tx)
1606 : : {
1607 : 15 : CommitTransactionCommand();
1439 andres@anarazel.de 1608 : 15 : pgstat_report_stat(true);
1609 : : }
1610 : :
1611 : : /*
1612 : : * Return false when there are no tables in subscription or not all tables
1613 : : * are in ready state; true otherwise.
1614 : : */
130 akapila@postgresql.o 1615 [ + - + + ]:GNC 201 : return has_tables && (table_states_not_ready == NIL);
1616 : : }
1617 : :
1618 : : /*
1619 : : * Return whether the subscription currently has any tables.
1620 : : *
1621 : : * Note: Unlike HasSubscriptionTables(), this function relies on cached
1622 : : * information for subscription tables. Additionally, it should not be
1623 : : * invoked outside of apply or tablesync workers, as MySubscription must be
1624 : : * initialized first.
1625 : : */
1626 : : bool
150 1627 : 117 : HasSubscriptionTablesCached(void)
1628 : : {
1629 : : bool started_tx;
1630 : : bool has_tables;
1631 : :
1632 : : /* We need up-to-date subscription tables info here */
130 1633 : 117 : FetchRelationStates(&has_tables, NULL, &started_tx);
1634 : :
188 1635 [ - + ]: 117 : if (started_tx)
1636 : : {
188 akapila@postgresql.o 1637 :UNC 0 : CommitTransactionCommand();
1638 : 0 : pgstat_report_stat(true);
1639 : : }
1640 : :
130 akapila@postgresql.o 1641 :GNC 117 : return has_tables;
1642 : : }
1643 : :
1644 : : /*
1645 : : * Update the two_phase state of the specified subscription in pg_subscription.
1646 : : */
1647 : : void
1705 akapila@postgresql.o 1648 :CBC 9 : UpdateTwoPhaseState(Oid suboid, char new_state)
1649 : : {
1650 : : Relation rel;
1651 : : HeapTuple tup;
1652 : : bool nulls[Natts_pg_subscription];
1653 : : bool replaces[Natts_pg_subscription];
1654 : : Datum values[Natts_pg_subscription];
1655 : :
1656 [ + - + - : 9 : Assert(new_state == LOGICALREP_TWOPHASE_STATE_DISABLED ||
- + ]
1657 : : new_state == LOGICALREP_TWOPHASE_STATE_PENDING ||
1658 : : new_state == LOGICALREP_TWOPHASE_STATE_ENABLED);
1659 : :
1660 : 9 : rel = table_open(SubscriptionRelationId, RowExclusiveLock);
1661 : 9 : tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(suboid));
1662 [ - + ]: 9 : if (!HeapTupleIsValid(tup))
1705 akapila@postgresql.o 1663 [ # # ]:UBC 0 : elog(ERROR,
1664 : : "cache lookup failed for subscription oid %u",
1665 : : suboid);
1666 : :
1667 : : /* Form a new tuple. */
1705 akapila@postgresql.o 1668 :CBC 9 : memset(values, 0, sizeof(values));
1669 : 9 : memset(nulls, false, sizeof(nulls));
1670 : 9 : memset(replaces, false, sizeof(replaces));
1671 : :
1672 : : /* And update/set two_phase state */
1673 : 9 : values[Anum_pg_subscription_subtwophasestate - 1] = CharGetDatum(new_state);
1674 : 9 : replaces[Anum_pg_subscription_subtwophasestate - 1] = true;
1675 : :
1676 : 9 : tup = heap_modify_tuple(tup, RelationGetDescr(rel),
1677 : : values, nulls, replaces);
1678 : 9 : CatalogTupleUpdate(rel, &tup->t_self, tup);
1679 : :
1680 : 9 : heap_freetuple(tup);
1681 : 9 : table_close(rel, RowExclusiveLock);
1682 : 9 : }
|