Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : * sequencesync.c
3 : : * PostgreSQL logical replication: sequence synchronization
4 : : *
5 : : * Copyright (c) 2025-2026, PostgreSQL Global Development Group
6 : : *
7 : : * IDENTIFICATION
8 : : * src/backend/replication/logical/sequencesync.c
9 : : *
10 : : * NOTES
11 : : * This file contains code for sequence synchronization for
12 : : * logical replication.
13 : : *
14 : : * Sequences requiring synchronization are tracked in the pg_subscription_rel
15 : : * catalog.
16 : : *
17 : : * Sequences to be synchronized will be added with state INIT when either of
18 : : * the following commands is executed:
19 : : * CREATE SUBSCRIPTION
20 : : * ALTER SUBSCRIPTION ... REFRESH PUBLICATION
21 : : *
22 : : * Executing the following command resets all sequences in the subscription to
23 : : * state INIT, triggering re-synchronization:
24 : : * ALTER SUBSCRIPTION ... REFRESH SEQUENCES
25 : : *
26 : : * The apply worker periodically scans pg_subscription_rel for sequences in
27 : : * INIT state. When such sequences are found, it spawns a sequencesync worker
28 : : * to handle synchronization.
29 : : *
30 : : * A single sequencesync worker is responsible for synchronizing all sequences.
31 : : * It begins by retrieving the list of sequences that are flagged for
32 : : * synchronization, i.e., those in the INIT state. These sequences are then
33 : : * processed in batches, allowing multiple entries to be synchronized within a
34 : : * single transaction. The worker fetches the current sequence values and page
35 : : * LSNs from the remote publisher, updates the corresponding sequences on the
36 : : * local subscriber, and finally marks each sequence as READY upon successful
37 : : * synchronization.
38 : : *
39 : : * Sequence state transitions follow this pattern:
40 : : * INIT -> READY
41 : : *
42 : : * To avoid creating too many transactions, up to MAX_SEQUENCES_SYNC_PER_BATCH
43 : : * sequences are synchronized per transaction. The locks on the sequence
44 : : * relation will be periodically released at each transaction commit.
45 : : *
46 : : * XXX: We didn't choose launcher process to maintain the launch of sequencesync
47 : : * worker as it didn't have database connection to access the sequences from the
48 : : * pg_subscription_rel system catalog that need to be synchronized.
49 : : *-------------------------------------------------------------------------
50 : : */
51 : :
52 : : #include "postgres.h"
53 : :
54 : : #include "access/genam.h"
55 : : #include "access/table.h"
56 : : #include "catalog/pg_sequence.h"
57 : : #include "catalog/pg_subscription_rel.h"
58 : : #include "commands/sequence.h"
59 : : #include "pgstat.h"
60 : : #include "postmaster/interrupt.h"
61 : : #include "replication/logicalworker.h"
62 : : #include "replication/worker_internal.h"
63 : : #include "storage/lwlock.h"
64 : : #include "utils/acl.h"
65 : : #include "utils/builtins.h"
66 : : #include "utils/fmgroids.h"
67 : : #include "utils/guc.h"
68 : : #include "utils/inval.h"
69 : : #include "utils/lsyscache.h"
70 : : #include "utils/memutils.h"
71 : : #include "utils/pg_lsn.h"
72 : : #include "utils/syscache.h"
73 : : #include "utils/usercontext.h"
74 : :
75 : : #define REMOTE_SEQ_COL_COUNT 10
76 : :
77 : : typedef enum CopySeqResult
78 : : {
79 : : COPYSEQ_SUCCESS,
80 : : COPYSEQ_MISMATCH,
81 : : COPYSEQ_INSUFFICIENT_PERM,
82 : : COPYSEQ_SKIPPED
83 : : } CopySeqResult;
84 : :
85 : : static List *seqinfos = NIL;
86 : :
87 : : /*
88 : : * Apply worker determines if sequence synchronization is needed.
89 : : *
90 : : * Start a sequencesync worker if one is not already running. The active
91 : : * sequencesync worker will handle all pending sequence synchronization. If any
92 : : * sequences remain unsynchronized after it exits, a new worker can be started
93 : : * in the next iteration.
94 : : */
95 : : void
181 akapila@postgresql.o 96 :GNC 5019 : ProcessSequencesForSync(void)
97 : : {
98 : : LogicalRepWorker *sequencesync_worker;
99 : : int nsyncworkers;
100 : : bool has_pending_sequences;
101 : : bool started_tx;
102 : :
103 : 5019 : FetchRelationStates(NULL, &has_pending_sequences, &started_tx);
104 : :
105 [ + + ]: 5019 : if (started_tx)
106 : : {
107 : 190 : CommitTransactionCommand();
108 : 190 : pgstat_report_stat(true);
109 : : }
110 : :
111 [ + + ]: 5019 : if (!has_pending_sequences)
112 : 4996 : return;
113 : :
114 : 37 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
115 : :
116 : : /* Check if there is a sequencesync worker already running? */
117 : 37 : sequencesync_worker = logicalrep_worker_find(WORKERTYPE_SEQUENCESYNC,
118 : 37 : MyLogicalRepWorker->subid,
119 : : InvalidOid, true);
120 [ + + ]: 37 : if (sequencesync_worker)
121 : : {
122 : 14 : LWLockRelease(LogicalRepWorkerLock);
123 : 14 : return;
124 : : }
125 : :
126 : : /*
127 : : * Count running sync workers for this subscription, while we have the
128 : : * lock.
129 : : */
130 : 23 : nsyncworkers = logicalrep_sync_worker_count(MyLogicalRepWorker->subid);
131 : 23 : LWLockRelease(LogicalRepWorkerLock);
132 : :
133 : : /*
134 : : * It is okay to read/update last_seqsync_start_time here in apply worker
135 : : * as we have already ensured that sync worker doesn't exist.
136 : : */
137 : 23 : launch_sync_worker(WORKERTYPE_SEQUENCESYNC, nsyncworkers, InvalidOid,
138 : 23 : &MyLogicalRepWorker->last_seqsync_start_time);
139 : : }
140 : :
141 : : /*
142 : : * get_sequences_string
143 : : *
144 : : * Build a comma-separated string of schema-qualified sequence names
145 : : * for the given list of sequence indexes.
146 : : */
147 : : static void
148 : 5 : get_sequences_string(List *seqindexes, StringInfo buf)
149 : : {
150 : 5 : resetStringInfo(buf);
151 [ + - + + : 15 : foreach_int(seqidx, seqindexes)
+ + ]
152 : : {
153 : : LogicalRepSequenceInfo *seqinfo =
154 : 5 : (LogicalRepSequenceInfo *) list_nth(seqinfos, seqidx);
155 : :
156 [ - + ]: 5 : if (buf->len > 0)
181 akapila@postgresql.o 157 :UNC 0 : appendStringInfoString(buf, ", ");
158 : :
181 akapila@postgresql.o 159 :GNC 5 : appendStringInfo(buf, "\"%s.%s\"", seqinfo->nspname, seqinfo->seqname);
160 : : }
161 : 5 : }
162 : :
163 : : /*
164 : : * report_sequence_errors
165 : : *
166 : : * Report discrepancies found during sequence synchronization between
167 : : * the publisher and subscriber. Emits warnings for:
168 : : * a) mismatched definitions or concurrent rename
169 : : * b) insufficient privileges
170 : : * c) missing sequences on the subscriber
171 : : * Then raises an ERROR to indicate synchronization failure.
172 : : */
173 : : static void
174 : 10 : report_sequence_errors(List *mismatched_seqs_idx, List *insuffperm_seqs_idx,
175 : : List *missing_seqs_idx)
176 : : {
177 : : StringInfoData seqstr;
178 : :
179 : : /* Quick exit if there are no errors to report */
180 [ + + + - : 10 : if (!mismatched_seqs_idx && !insuffperm_seqs_idx && !missing_seqs_idx)
+ + ]
181 : 5 : return;
182 : :
22 drowley@postgresql.o 183 : 5 : initStringInfo(&seqstr);
184 : :
181 akapila@postgresql.o 185 [ + + ]: 5 : if (mismatched_seqs_idx)
186 : : {
22 drowley@postgresql.o 187 : 3 : get_sequences_string(mismatched_seqs_idx, &seqstr);
181 akapila@postgresql.o 188 [ + - ]: 3 : ereport(WARNING,
189 : : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
190 : : errmsg_plural("mismatched or renamed sequence on subscriber (%s)",
191 : : "mismatched or renamed sequences on subscriber (%s)",
192 : : list_length(mismatched_seqs_idx),
193 : : seqstr.data));
194 : : }
195 : :
196 [ - + ]: 5 : if (insuffperm_seqs_idx)
197 : : {
22 drowley@postgresql.o 198 :UNC 0 : get_sequences_string(insuffperm_seqs_idx, &seqstr);
181 akapila@postgresql.o 199 [ # # ]: 0 : ereport(WARNING,
200 : : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
201 : : errmsg_plural("insufficient privileges on sequence (%s)",
202 : : "insufficient privileges on sequences (%s)",
203 : : list_length(insuffperm_seqs_idx),
204 : : seqstr.data));
205 : : }
206 : :
181 akapila@postgresql.o 207 [ + + ]:GNC 5 : if (missing_seqs_idx)
208 : : {
22 drowley@postgresql.o 209 : 2 : get_sequences_string(missing_seqs_idx, &seqstr);
181 akapila@postgresql.o 210 [ + - ]: 2 : ereport(WARNING,
211 : : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
212 : : errmsg_plural("missing sequence on publisher (%s)",
213 : : "missing sequences on publisher (%s)",
214 : : list_length(missing_seqs_idx),
215 : : seqstr.data));
216 : : }
217 : :
218 [ + - ]: 5 : ereport(ERROR,
219 : : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
220 : : errmsg("logical replication sequence synchronization failed for subscription \"%s\"",
221 : : MySubscription->name));
222 : : }
223 : :
224 : : /*
225 : : * get_and_validate_seq_info
226 : : *
227 : : * Extracts remote sequence information from the tuple slot received from the
228 : : * publisher, and validates it against the corresponding local sequence
229 : : * definition.
230 : : */
231 : : static CopySeqResult
232 : 17 : get_and_validate_seq_info(TupleTableSlot *slot, Relation *sequence_rel,
233 : : LogicalRepSequenceInfo **seqinfo, int *seqidx)
234 : : {
235 : : bool isnull;
236 : 17 : int col = 0;
237 : : Datum datum;
238 : : Oid remote_typid;
239 : : int64 remote_start;
240 : : int64 remote_increment;
241 : : int64 remote_min;
242 : : int64 remote_max;
243 : : bool remote_cycle;
244 : 17 : CopySeqResult result = COPYSEQ_SUCCESS;
245 : : HeapTuple tup;
246 : : Form_pg_sequence local_seq;
247 : : LogicalRepSequenceInfo *seqinfo_local;
248 : :
249 : 17 : *seqidx = DatumGetInt32(slot_getattr(slot, ++col, &isnull));
250 [ - + ]: 17 : Assert(!isnull);
251 : :
252 : : /* Identify the corresponding local sequence for the given index. */
253 : 17 : *seqinfo = seqinfo_local =
254 : 17 : (LogicalRepSequenceInfo *) list_nth(seqinfos, *seqidx);
255 : :
256 : : /*
257 : : * The sequence data can be NULL due to insufficient privileges or if the
258 : : * sequence was dropped concurrently (see pg_get_sequence_data()).
259 : : */
105 260 : 17 : datum = slot_getattr(slot, ++col, &isnull);
261 [ + + ]: 17 : if (isnull)
262 : 1 : return COPYSEQ_SKIPPED;
263 : 16 : seqinfo_local->last_value = DatumGetInt64(datum);
264 : :
181 265 : 16 : seqinfo_local->is_called = DatumGetBool(slot_getattr(slot, ++col, &isnull));
266 [ - + ]: 16 : Assert(!isnull);
267 : :
268 : 16 : seqinfo_local->page_lsn = DatumGetLSN(slot_getattr(slot, ++col, &isnull));
269 [ - + ]: 16 : Assert(!isnull);
270 : :
271 : 16 : remote_typid = DatumGetObjectId(slot_getattr(slot, ++col, &isnull));
272 [ - + ]: 16 : Assert(!isnull);
273 : :
274 : 16 : remote_start = DatumGetInt64(slot_getattr(slot, ++col, &isnull));
275 [ - + ]: 16 : Assert(!isnull);
276 : :
277 : 16 : remote_increment = DatumGetInt64(slot_getattr(slot, ++col, &isnull));
278 [ - + ]: 16 : Assert(!isnull);
279 : :
280 : 16 : remote_min = DatumGetInt64(slot_getattr(slot, ++col, &isnull));
281 [ - + ]: 16 : Assert(!isnull);
282 : :
283 : 16 : remote_max = DatumGetInt64(slot_getattr(slot, ++col, &isnull));
284 [ - + ]: 16 : Assert(!isnull);
285 : :
286 : 16 : remote_cycle = DatumGetBool(slot_getattr(slot, ++col, &isnull));
287 [ - + ]: 16 : Assert(!isnull);
288 : :
289 : : /* Sanity check */
290 [ - + ]: 16 : Assert(col == REMOTE_SEQ_COL_COUNT);
291 : :
292 : 16 : seqinfo_local->found_on_pub = true;
293 : :
294 : 16 : *sequence_rel = try_table_open(seqinfo_local->localrelid, RowExclusiveLock);
295 : :
296 : : /* Sequence was concurrently dropped? */
297 [ - + ]: 16 : if (!*sequence_rel)
181 akapila@postgresql.o 298 :UNC 0 : return COPYSEQ_SKIPPED;
299 : :
181 akapila@postgresql.o 300 :GNC 16 : tup = SearchSysCache1(SEQRELID, ObjectIdGetDatum(seqinfo_local->localrelid));
301 : :
302 : : /* Sequence was concurrently dropped? */
303 [ - + ]: 16 : if (!HeapTupleIsValid(tup))
181 akapila@postgresql.o 304 [ # # ]:UNC 0 : elog(ERROR, "cache lookup failed for sequence %u",
305 : : seqinfo_local->localrelid);
306 : :
181 akapila@postgresql.o 307 :GNC 16 : local_seq = (Form_pg_sequence) GETSTRUCT(tup);
308 : :
309 : : /* Sequence parameters for remote/local are the same? */
310 [ + - ]: 16 : if (local_seq->seqtypid != remote_typid ||
311 [ + + ]: 16 : local_seq->seqstart != remote_start ||
312 [ + + ]: 15 : local_seq->seqincrement != remote_increment ||
313 [ + - ]: 13 : local_seq->seqmin != remote_min ||
314 [ + - ]: 13 : local_seq->seqmax != remote_max ||
315 [ - + ]: 13 : local_seq->seqcycle != remote_cycle)
316 : 3 : result = COPYSEQ_MISMATCH;
317 : :
318 : : /* Sequence was concurrently renamed? */
319 [ + - ]: 16 : if (strcmp(seqinfo_local->nspname,
320 : 16 : get_namespace_name(RelationGetNamespace(*sequence_rel))) ||
321 [ - + ]: 16 : strcmp(seqinfo_local->seqname, RelationGetRelationName(*sequence_rel)))
181 akapila@postgresql.o 322 :UNC 0 : result = COPYSEQ_MISMATCH;
323 : :
181 akapila@postgresql.o 324 :GNC 16 : ReleaseSysCache(tup);
325 : 16 : return result;
326 : : }
327 : :
328 : : /*
329 : : * Apply remote sequence state to local sequence and mark it as
330 : : * synchronized (READY).
331 : : */
332 : : static CopySeqResult
333 : 13 : copy_sequence(LogicalRepSequenceInfo *seqinfo, Oid seqowner)
334 : : {
335 : : UserContext ucxt;
336 : : AclResult aclresult;
337 : 13 : bool run_as_owner = MySubscription->runasowner;
338 : 13 : Oid seqoid = seqinfo->localrelid;
339 : :
340 : : /*
341 : : * If the user did not opt to run as the owner of the subscription
342 : : * ('run_as_owner'), then copy the sequence as the owner of the sequence.
343 : : */
344 [ + - ]: 13 : if (!run_as_owner)
345 : 13 : SwitchToUntrustedUser(seqowner, &ucxt);
346 : :
347 : 13 : aclresult = pg_class_aclcheck(seqoid, GetUserId(), ACL_UPDATE);
348 : :
349 [ - + ]: 13 : if (aclresult != ACLCHECK_OK)
350 : : {
181 akapila@postgresql.o 351 [ # # ]:UNC 0 : if (!run_as_owner)
352 : 0 : RestoreUserContext(&ucxt);
353 : :
354 : 0 : return COPYSEQ_INSUFFICIENT_PERM;
355 : : }
356 : :
357 : : /*
358 : : * The log counter (log_cnt) tracks how many sequence values are still
359 : : * unused locally. It is only relevant to the local node and managed
360 : : * internally by nextval() when allocating new ranges. Since log_cnt does
361 : : * not affect the visible sequence state (like last_value or is_called)
362 : : * and is only used for local caching, it need not be copied to the
363 : : * subscriber during synchronization.
364 : : */
181 akapila@postgresql.o 365 :GNC 13 : SetSequence(seqoid, seqinfo->last_value, seqinfo->is_called);
366 : :
367 [ + - ]: 13 : if (!run_as_owner)
368 : 13 : RestoreUserContext(&ucxt);
369 : :
370 : : /*
371 : : * Record the remote sequence's LSN in pg_subscription_rel and mark the
372 : : * sequence as READY.
373 : : */
374 : 13 : UpdateSubscriptionRelState(MySubscription->oid, seqoid, SUBREL_STATE_READY,
375 : : seqinfo->page_lsn, false);
376 : :
377 : 13 : return COPYSEQ_SUCCESS;
378 : : }
379 : :
380 : : /*
381 : : * Copy existing data of sequences from the publisher.
382 : : */
383 : : static void
384 : 10 : copy_sequences(WalReceiverConn *conn)
385 : : {
386 : 10 : int cur_batch_base_index = 0;
387 : 10 : int n_seqinfos = list_length(seqinfos);
388 : 10 : List *mismatched_seqs_idx = NIL;
389 : 10 : List *missing_seqs_idx = NIL;
390 : 10 : List *insuffperm_seqs_idx = NIL;
391 : : StringInfoData seqstr;
392 : : StringInfoData cmd;
393 : : MemoryContext oldctx;
394 : :
22 drowley@postgresql.o 395 : 10 : initStringInfo(&seqstr);
396 : 10 : initStringInfo(&cmd);
397 : :
398 : : #define MAX_SEQUENCES_SYNC_PER_BATCH 100
399 : :
181 akapila@postgresql.o 400 [ - + ]: 10 : elog(DEBUG1,
401 : : "logical replication sequence synchronization for subscription \"%s\" - total unsynchronized: %d",
402 : : MySubscription->name, n_seqinfos);
403 : :
404 [ + + ]: 20 : while (cur_batch_base_index < n_seqinfos)
405 : : {
406 : 10 : Oid seqRow[REMOTE_SEQ_COL_COUNT] = {INT8OID, INT8OID,
407 : : BOOLOID, LSNOID, OIDOID, INT8OID, INT8OID, INT8OID, INT8OID, BOOLOID};
408 : 10 : int batch_size = 0;
409 : 10 : int batch_succeeded_count = 0;
410 : 10 : int batch_mismatched_count = 0;
411 : 10 : int batch_skipped_count = 0;
412 : 10 : int batch_insuffperm_count = 0;
413 : : int batch_missing_count;
414 : :
415 : : WalRcvExecResult *res;
416 : : TupleTableSlot *slot;
417 : :
418 : 10 : StartTransactionCommand();
419 : :
420 [ + + ]: 28 : for (int idx = cur_batch_base_index; idx < n_seqinfos; idx++)
421 : : {
422 : : char *nspname_literal;
423 : : char *seqname_literal;
424 : :
425 : : LogicalRepSequenceInfo *seqinfo =
426 : 18 : (LogicalRepSequenceInfo *) list_nth(seqinfos, idx);
427 : :
22 drowley@postgresql.o 428 [ + + ]: 18 : if (seqstr.len > 0)
429 : 8 : appendStringInfoString(&seqstr, ", ");
430 : :
180 akapila@postgresql.o 431 : 18 : nspname_literal = quote_literal_cstr(seqinfo->nspname);
432 : 18 : seqname_literal = quote_literal_cstr(seqinfo->seqname);
433 : :
22 drowley@postgresql.o 434 : 18 : appendStringInfo(&seqstr, "(%s, %s, %d)",
435 : : nspname_literal, seqname_literal, idx);
436 : :
181 akapila@postgresql.o 437 [ - + ]: 18 : if (++batch_size == MAX_SEQUENCES_SYNC_PER_BATCH)
181 akapila@postgresql.o 438 :UNC 0 : break;
439 : : }
440 : :
441 : : /*
442 : : * We deliberately avoid acquiring a local lock on the sequence before
443 : : * querying the publisher to prevent potential distributed deadlocks
444 : : * in bi-directional replication setups.
445 : : *
446 : : * Example scenario:
447 : : *
448 : : * - On each node, a background worker acquires a lock on a sequence
449 : : * as part of a sync operation.
450 : : *
451 : : * - Concurrently, a user transaction attempts to alter the same
452 : : * sequence, waiting on the background worker's lock.
453 : : *
454 : : * - Meanwhile, a query from the other node tries to access metadata
455 : : * that depends on the completion of the alter operation.
456 : : *
457 : : * - This creates a circular wait across nodes:
458 : : *
459 : : * Node-1: Query -> waits on Alter -> waits on Sync Worker
460 : : *
461 : : * Node-2: Query -> waits on Alter -> waits on Sync Worker
462 : : *
463 : : * Since each node only sees part of the wait graph, the deadlock may
464 : : * go undetected, leading to indefinite blocking.
465 : : *
466 : : * Note: Each entry in VALUES includes an index 'seqidx' that
467 : : * represents the sequence's position in the local 'seqinfos' list.
468 : : * This index is propagated to the query results and later used to
469 : : * directly map the fetched publisher sequence rows back to their
470 : : * corresponding local entries without relying on result order or name
471 : : * matching.
472 : : */
22 drowley@postgresql.o 473 :GNC 10 : appendStringInfo(&cmd,
474 : : "SELECT s.seqidx, ps.*, seq.seqtypid,\n"
475 : : " seq.seqstart, seq.seqincrement, seq.seqmin,\n"
476 : : " seq.seqmax, seq.seqcycle\n"
477 : : "FROM ( VALUES %s ) AS s (schname, seqname, seqidx)\n"
478 : : "JOIN pg_namespace n ON n.nspname = s.schname\n"
479 : : "JOIN pg_class c ON c.relnamespace = n.oid AND c.relname = s.seqname\n"
480 : : "JOIN pg_sequence seq ON seq.seqrelid = c.oid\n"
481 : : "JOIN LATERAL pg_get_sequence_data(seq.seqrelid) AS ps ON true\n",
482 : : seqstr.data);
483 : :
484 : 10 : res = walrcv_exec(conn, cmd.data, lengthof(seqRow), seqRow);
181 akapila@postgresql.o 485 [ - + ]: 10 : if (res->status != WALRCV_OK_TUPLES)
181 akapila@postgresql.o 486 [ # # ]:UNC 0 : ereport(ERROR,
487 : : errcode(ERRCODE_CONNECTION_FAILURE),
488 : : errmsg("could not fetch sequence information from the publisher: %s",
489 : : res->err));
490 : :
181 akapila@postgresql.o 491 :GNC 10 : slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
492 [ + + ]: 27 : while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
493 : : {
494 : : CopySeqResult sync_status;
495 : : LogicalRepSequenceInfo *seqinfo;
5 496 : 17 : Relation sequence_rel = NULL;
497 : : int seqidx;
498 : :
181 499 [ - + ]: 17 : CHECK_FOR_INTERRUPTS();
500 : :
501 [ - + ]: 17 : if (ConfigReloadPending)
502 : : {
181 akapila@postgresql.o 503 :UNC 0 : ConfigReloadPending = false;
504 : 0 : ProcessConfigFile(PGC_SIGHUP);
505 : : }
506 : :
181 akapila@postgresql.o 507 :GNC 17 : sync_status = get_and_validate_seq_info(slot, &sequence_rel,
508 : : &seqinfo, &seqidx);
509 [ + + ]: 17 : if (sync_status == COPYSEQ_SUCCESS)
510 : 13 : sync_status = copy_sequence(seqinfo,
511 : 13 : sequence_rel->rd_rel->relowner);
512 : :
513 [ + + - + : 17 : switch (sync_status)
- ]
514 : : {
515 : 13 : case COPYSEQ_SUCCESS:
516 [ - + ]: 13 : elog(DEBUG1,
517 : : "logical replication synchronization for subscription \"%s\", sequence \"%s.%s\" has finished",
518 : : MySubscription->name, seqinfo->nspname,
519 : : seqinfo->seqname);
520 : 13 : batch_succeeded_count++;
521 : 13 : break;
522 : 3 : case COPYSEQ_MISMATCH:
523 : :
524 : : /*
525 : : * Remember mismatched sequences in a long-lived memory
526 : : * context since these will be used after the transaction
527 : : * is committed.
528 : : */
529 : 3 : oldctx = MemoryContextSwitchTo(ApplyContext);
530 : 3 : mismatched_seqs_idx = lappend_int(mismatched_seqs_idx,
531 : : seqidx);
532 : 3 : MemoryContextSwitchTo(oldctx);
533 : 3 : batch_mismatched_count++;
534 : 3 : break;
181 akapila@postgresql.o 535 :UNC 0 : case COPYSEQ_INSUFFICIENT_PERM:
536 : :
537 : : /*
538 : : * Remember sequences with insufficient privileges in a
539 : : * long-lived memory context since these will be used
540 : : * after the transaction is committed.
541 : : */
542 : 0 : oldctx = MemoryContextSwitchTo(ApplyContext);
543 : 0 : insuffperm_seqs_idx = lappend_int(insuffperm_seqs_idx,
544 : : seqidx);
545 : 0 : MemoryContextSwitchTo(oldctx);
546 : 0 : batch_insuffperm_count++;
547 : 0 : break;
181 akapila@postgresql.o 548 :GNC 1 : case COPYSEQ_SKIPPED:
549 : :
550 : : /*
551 : : * Concurrent removal of a sequence on the subscriber is
552 : : * treated as success, since the only viable action is to
553 : : * skip the corresponding sequence data. Missing sequences
554 : : * on the publisher are treated as ERROR.
555 : : */
105 556 [ - + ]: 1 : if (seqinfo->found_on_pub)
557 : : {
105 akapila@postgresql.o 558 [ # # ]:UNC 0 : ereport(LOG,
559 : : errmsg("skip synchronization of sequence \"%s.%s\" because it has been dropped concurrently",
560 : : seqinfo->nspname,
561 : : seqinfo->seqname));
562 : 0 : batch_skipped_count++;
563 : : }
181 akapila@postgresql.o 564 :GNC 1 : break;
565 : : }
566 : :
567 [ + + ]: 17 : if (sequence_rel)
568 : 16 : table_close(sequence_rel, NoLock);
569 : : }
570 : :
571 : 10 : ExecDropSingleTupleTableSlot(slot);
572 : 10 : walrcv_clear_result(res);
22 drowley@postgresql.o 573 : 10 : resetStringInfo(&seqstr);
574 : 10 : resetStringInfo(&cmd);
575 : :
181 akapila@postgresql.o 576 : 10 : batch_missing_count = batch_size - (batch_succeeded_count +
577 : 10 : batch_mismatched_count +
578 : 10 : batch_insuffperm_count +
579 : : batch_skipped_count);
580 : :
581 [ - + ]: 10 : elog(DEBUG1,
582 : : "logical replication sequence synchronization for subscription \"%s\" - batch #%d = %d attempted, %d succeeded, %d mismatched, %d insufficient permission, %d missing from publisher, %d skipped",
583 : : MySubscription->name,
584 : : (cur_batch_base_index / MAX_SEQUENCES_SYNC_PER_BATCH) + 1,
585 : : batch_size, batch_succeeded_count, batch_mismatched_count,
586 : : batch_insuffperm_count, batch_missing_count, batch_skipped_count);
587 : :
588 : : /* Commit this batch, and prepare for next batch */
589 : 10 : CommitTransactionCommand();
590 : :
591 [ + + ]: 10 : if (batch_missing_count)
592 : : {
593 [ + + ]: 8 : for (int idx = cur_batch_base_index; idx < cur_batch_base_index + batch_size; idx++)
594 : : {
595 : : LogicalRepSequenceInfo *seqinfo =
596 : 6 : (LogicalRepSequenceInfo *) list_nth(seqinfos, idx);
597 : :
598 : : /* If the sequence was not found on publisher, record it */
599 [ + + ]: 6 : if (!seqinfo->found_on_pub)
600 : 2 : missing_seqs_idx = lappend_int(missing_seqs_idx, idx);
601 : : }
602 : : }
603 : :
604 : : /*
605 : : * cur_batch_base_index is not incremented sequentially because some
606 : : * sequences may be missing, and the number of fetched rows may not
607 : : * match the batch size.
608 : : */
609 : 10 : cur_batch_base_index += batch_size;
610 : : }
611 : :
612 : : /* Report mismatches, permission issues, or missing sequences */
613 : 10 : report_sequence_errors(mismatched_seqs_idx, insuffperm_seqs_idx,
614 : : missing_seqs_idx);
615 : 5 : }
616 : :
617 : : /*
618 : : * Identifies sequences that require synchronization and initiates the
619 : : * synchronization process.
620 : : */
621 : : static void
622 : 10 : LogicalRepSyncSequences(void)
623 : : {
624 : : char *err;
625 : : bool must_use_password;
626 : : Relation rel;
627 : : HeapTuple tup;
628 : : ScanKeyData skey[2];
629 : : SysScanDesc scan;
630 : 10 : Oid subid = MyLogicalRepWorker->subid;
631 : : StringInfoData app_name;
632 : :
633 : 10 : StartTransactionCommand();
634 : :
635 : 10 : rel = table_open(SubscriptionRelRelationId, AccessShareLock);
636 : :
637 : 10 : ScanKeyInit(&skey[0],
638 : : Anum_pg_subscription_rel_srsubid,
639 : : BTEqualStrategyNumber, F_OIDEQ,
640 : : ObjectIdGetDatum(subid));
641 : :
642 : 10 : ScanKeyInit(&skey[1],
643 : : Anum_pg_subscription_rel_srsubstate,
644 : : BTEqualStrategyNumber, F_CHAREQ,
645 : : CharGetDatum(SUBREL_STATE_INIT));
646 : :
647 : 10 : scan = systable_beginscan(rel, InvalidOid, false,
648 : : NULL, 2, skey);
649 [ + + ]: 29 : while (HeapTupleIsValid(tup = systable_getnext(scan)))
650 : : {
651 : : Form_pg_subscription_rel subrel;
652 : : LogicalRepSequenceInfo *seq;
653 : : Relation sequence_rel;
654 : : MemoryContext oldctx;
655 : :
656 [ - + ]: 19 : CHECK_FOR_INTERRUPTS();
657 : :
658 : 19 : subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
659 : :
660 : 19 : sequence_rel = try_table_open(subrel->srrelid, RowExclusiveLock);
661 : :
662 : : /* Skip if sequence was dropped concurrently */
663 [ - + ]: 19 : if (!sequence_rel)
181 akapila@postgresql.o 664 :UNC 0 : continue;
665 : :
666 : : /* Skip if the relation is not a sequence */
181 akapila@postgresql.o 667 [ + + ]:GNC 19 : if (sequence_rel->rd_rel->relkind != RELKIND_SEQUENCE)
668 : : {
669 : 1 : table_close(sequence_rel, NoLock);
670 : 1 : continue;
671 : : }
672 : :
673 : : /*
674 : : * Worker needs to process sequences across transaction boundary, so
675 : : * allocate them under long-lived context.
676 : : */
677 : 18 : oldctx = MemoryContextSwitchTo(ApplyContext);
678 : :
679 : 18 : seq = palloc0_object(LogicalRepSequenceInfo);
680 : 18 : seq->localrelid = subrel->srrelid;
681 : 18 : seq->nspname = get_namespace_name(RelationGetNamespace(sequence_rel));
682 : 18 : seq->seqname = pstrdup(RelationGetRelationName(sequence_rel));
683 : 18 : seqinfos = lappend(seqinfos, seq);
684 : :
685 : 18 : MemoryContextSwitchTo(oldctx);
686 : :
687 : 18 : table_close(sequence_rel, NoLock);
688 : : }
689 : :
690 : : /* Cleanup */
691 : 10 : systable_endscan(scan);
692 : 10 : table_close(rel, AccessShareLock);
693 : :
694 : 10 : CommitTransactionCommand();
695 : :
696 : : /*
697 : : * Exit early if no catalog entries found, likely due to concurrent drops.
698 : : */
699 [ - + ]: 10 : if (!seqinfos)
181 akapila@postgresql.o 700 :UNC 0 : return;
701 : :
702 : : /* Is the use of a password mandatory? */
181 akapila@postgresql.o 703 [ + - ]:GNC 20 : must_use_password = MySubscription->passwordrequired &&
704 [ - + ]: 10 : !MySubscription->ownersuperuser;
705 : :
706 : 10 : initStringInfo(&app_name);
707 : 10 : appendStringInfo(&app_name, "pg_%u_sequence_sync_" UINT64_FORMAT,
708 : 10 : MySubscription->oid, GetSystemIdentifier());
709 : :
710 : : /*
711 : : * Establish the connection to the publisher for sequence synchronization.
712 : : */
713 : 10 : LogRepWorkerWalRcvConn =
714 : 10 : walrcv_connect(MySubscription->conninfo, true, true,
715 : : must_use_password,
716 : : app_name.data, &err);
717 [ - + ]: 10 : if (LogRepWorkerWalRcvConn == NULL)
181 akapila@postgresql.o 718 [ # # ]:UNC 0 : ereport(ERROR,
719 : : errcode(ERRCODE_CONNECTION_FAILURE),
720 : : errmsg("sequencesync worker for subscription \"%s\" could not connect to the publisher: %s",
721 : : MySubscription->name, err));
722 : :
181 akapila@postgresql.o 723 :GNC 10 : pfree(app_name.data);
724 : :
725 : 10 : copy_sequences(LogRepWorkerWalRcvConn);
726 : : }
727 : :
728 : : /*
729 : : * Execute the initial sync with error handling. Disable the subscription,
730 : : * if required.
731 : : *
732 : : * Note that we don't handle FATAL errors which are probably because of system
733 : : * resource error and are not repeatable.
734 : : */
735 : : static void
153 nathan@postgresql.or 736 : 10 : start_sequence_sync(void)
737 : : {
181 akapila@postgresql.o 738 [ - + ]: 10 : Assert(am_sequencesync_worker());
739 : :
740 [ + + ]: 10 : PG_TRY();
741 : : {
742 : : /* Call initial sync. */
743 : 10 : LogicalRepSyncSequences();
744 : : }
745 : 5 : PG_CATCH();
746 : : {
747 [ - + ]: 5 : if (MySubscription->disableonerr)
181 akapila@postgresql.o 748 :UNC 0 : DisableSubscriptionAndExit();
749 : : else
750 : : {
751 : : /*
752 : : * Report the worker failed during sequence synchronization. Abort
753 : : * the current transaction so that the stats message is sent in an
754 : : * idle state.
755 : : */
181 akapila@postgresql.o 756 :GNC 5 : AbortOutOfAnyTransaction();
74 757 : 5 : pgstat_report_subscription_error(MySubscription->oid);
758 : :
181 759 : 5 : PG_RE_THROW();
760 : : }
761 : : }
762 [ - + ]: 5 : PG_END_TRY();
763 : 5 : }
764 : :
765 : : /* Logical Replication sequencesync worker entry point */
766 : : void
767 : 10 : SequenceSyncWorkerMain(Datum main_arg)
768 : : {
769 : 10 : int worker_slot = DatumGetInt32(main_arg);
770 : :
771 : 10 : SetupApplyOrSyncWorker(worker_slot);
772 : :
773 : 10 : start_sequence_sync();
774 : :
775 : 5 : FinishSyncWorker();
776 : : }
|