Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : * sequencesync.c
3 : : * PostgreSQL logical replication: sequence synchronization
4 : : *
5 : : * Copyright (c) 2025, PostgreSQL Global Development Group
6 : : *
7 : : * IDENTIFICATION
8 : : * src/backend/replication/logical/sequencesync.c
9 : : *
10 : : * NOTES
11 : : * This file contains code for sequence synchronization for
12 : : * logical replication.
13 : : *
14 : : * Sequences requiring synchronization are tracked in the pg_subscription_rel
15 : : * catalog.
16 : : *
17 : : * Sequences to be synchronized will be added with state INIT when either of
18 : : * the following commands is executed:
19 : : * CREATE SUBSCRIPTION
20 : : * ALTER SUBSCRIPTION ... REFRESH PUBLICATION
21 : : *
22 : : * Executing the following command resets all sequences in the subscription to
23 : : * state INIT, triggering re-synchronization:
24 : : * ALTER SUBSCRIPTION ... REFRESH SEQUENCES
25 : : *
26 : : * The apply worker periodically scans pg_subscription_rel for sequences in
27 : : * INIT state. When such sequences are found, it spawns a sequencesync worker
28 : : * to handle synchronization.
29 : : *
30 : : * A single sequencesync worker is responsible for synchronizing all sequences.
31 : : * It begins by retrieving the list of sequences that are flagged for
32 : : * synchronization, i.e., those in the INIT state. These sequences are then
33 : : * processed in batches, allowing multiple entries to be synchronized within a
34 : : * single transaction. The worker fetches the current sequence values and page
35 : : * LSNs from the remote publisher, updates the corresponding sequences on the
36 : : * local subscriber, and finally marks each sequence as READY upon successful
37 : : * synchronization.
38 : : *
39 : : * Sequence state transitions follow this pattern:
40 : : * INIT -> READY
41 : : *
42 : : * To avoid creating too many transactions, up to MAX_SEQUENCES_SYNC_PER_BATCH
43 : : * sequences are synchronized per transaction. The locks on the sequence
44 : : * relation will be periodically released at each transaction commit.
45 : : *
46 : : * XXX: We didn't choose launcher process to maintain the launch of sequencesync
47 : : * worker as it didn't have database connection to access the sequences from the
48 : : * pg_subscription_rel system catalog that need to be synchronized.
49 : : *-------------------------------------------------------------------------
50 : : */
51 : :
52 : : #include "postgres.h"
53 : :
54 : : #include "access/table.h"
55 : : #include "catalog/pg_sequence.h"
56 : : #include "catalog/pg_subscription_rel.h"
57 : : #include "commands/sequence.h"
58 : : #include "pgstat.h"
59 : : #include "postmaster/interrupt.h"
60 : : #include "replication/logicalworker.h"
61 : : #include "replication/worker_internal.h"
62 : : #include "utils/acl.h"
63 : : #include "utils/builtins.h"
64 : : #include "utils/fmgroids.h"
65 : : #include "utils/guc.h"
66 : : #include "utils/inval.h"
67 : : #include "utils/lsyscache.h"
68 : : #include "utils/memutils.h"
69 : : #include "utils/pg_lsn.h"
70 : : #include "utils/syscache.h"
71 : : #include "utils/usercontext.h"
72 : :
73 : : #define REMOTE_SEQ_COL_COUNT 10
74 : :
75 : : typedef enum CopySeqResult
76 : : {
77 : : COPYSEQ_SUCCESS,
78 : : COPYSEQ_MISMATCH,
79 : : COPYSEQ_INSUFFICIENT_PERM,
80 : : COPYSEQ_SKIPPED
81 : : } CopySeqResult;
82 : :
83 : : static List *seqinfos = NIL;
84 : :
85 : : /*
86 : : * Apply worker determines if sequence synchronization is needed.
87 : : *
88 : : * Start a sequencesync worker if one is not already running. The active
89 : : * sequencesync worker will handle all pending sequence synchronization. If any
90 : : * sequences remain unsynchronized after it exits, a new worker can be started
91 : : * in the next iteration.
92 : : */
93 : : void
42 akapila@postgresql.o 94 :GNC 3805 : ProcessSequencesForSync(void)
95 : : {
96 : : LogicalRepWorker *sequencesync_worker;
97 : : int nsyncworkers;
98 : : bool has_pending_sequences;
99 : : bool started_tx;
100 : :
101 : 3805 : FetchRelationStates(NULL, &has_pending_sequences, &started_tx);
102 : :
103 [ + + ]: 3805 : if (started_tx)
104 : : {
105 : 177 : CommitTransactionCommand();
106 : 177 : pgstat_report_stat(true);
107 : : }
108 : :
109 [ + + ]: 3805 : if (!has_pending_sequences)
110 : 3790 : return;
111 : :
112 : 28 : LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
113 : :
114 : : /* Check if there is a sequencesync worker already running? */
115 : 28 : sequencesync_worker = logicalrep_worker_find(WORKERTYPE_SEQUENCESYNC,
116 : 28 : MyLogicalRepWorker->subid,
117 : : InvalidOid, true);
118 [ + + ]: 28 : if (sequencesync_worker)
119 : : {
120 : 13 : LWLockRelease(LogicalRepWorkerLock);
121 : 13 : return;
122 : : }
123 : :
124 : : /*
125 : : * Count running sync workers for this subscription, while we have the
126 : : * lock.
127 : : */
128 : 15 : nsyncworkers = logicalrep_sync_worker_count(MyLogicalRepWorker->subid);
129 : 15 : LWLockRelease(LogicalRepWorkerLock);
130 : :
131 : : /*
132 : : * It is okay to read/update last_seqsync_start_time here in apply worker
133 : : * as we have already ensured that sync worker doesn't exist.
134 : : */
135 : 15 : launch_sync_worker(WORKERTYPE_SEQUENCESYNC, nsyncworkers, InvalidOid,
136 : 15 : &MyLogicalRepWorker->last_seqsync_start_time);
137 : : }
138 : :
139 : : /*
140 : : * get_sequences_string
141 : : *
142 : : * Build a comma-separated string of schema-qualified sequence names
143 : : * for the given list of sequence indexes.
144 : : */
145 : : static void
146 : 4 : get_sequences_string(List *seqindexes, StringInfo buf)
147 : : {
148 : 4 : resetStringInfo(buf);
149 [ + - + + : 12 : foreach_int(seqidx, seqindexes)
+ + ]
150 : : {
151 : : LogicalRepSequenceInfo *seqinfo =
152 : 4 : (LogicalRepSequenceInfo *) list_nth(seqinfos, seqidx);
153 : :
154 [ - + ]: 4 : if (buf->len > 0)
42 akapila@postgresql.o 155 :UNC 0 : appendStringInfoString(buf, ", ");
156 : :
42 akapila@postgresql.o 157 :GNC 4 : appendStringInfo(buf, "\"%s.%s\"", seqinfo->nspname, seqinfo->seqname);
158 : : }
159 : 4 : }
160 : :
161 : : /*
162 : : * report_sequence_errors
163 : : *
164 : : * Report discrepancies found during sequence synchronization between
165 : : * the publisher and subscriber. Emits warnings for:
166 : : * a) mismatched definitions or concurrent rename
167 : : * b) insufficient privileges
168 : : * c) missing sequences on the subscriber
169 : : * Then raises an ERROR to indicate synchronization failure.
170 : : */
171 : : static void
172 : 9 : report_sequence_errors(List *mismatched_seqs_idx, List *insuffperm_seqs_idx,
173 : : List *missing_seqs_idx)
174 : : {
175 : : StringInfo seqstr;
176 : :
177 : : /* Quick exit if there are no errors to report */
178 [ + + + - : 9 : if (!mismatched_seqs_idx && !insuffperm_seqs_idx && !missing_seqs_idx)
+ + ]
179 : 5 : return;
180 : :
181 : 4 : seqstr = makeStringInfo();
182 : :
183 [ + + ]: 4 : if (mismatched_seqs_idx)
184 : : {
185 : 3 : get_sequences_string(mismatched_seqs_idx, seqstr);
186 [ + - ]: 3 : ereport(WARNING,
187 : : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
188 : : errmsg_plural("mismatched or renamed sequence on subscriber (%s)",
189 : : "mismatched or renamed sequences on subscriber (%s)",
190 : : list_length(mismatched_seqs_idx),
191 : : seqstr->data));
192 : : }
193 : :
194 [ - + ]: 4 : if (insuffperm_seqs_idx)
195 : : {
42 akapila@postgresql.o 196 :UNC 0 : get_sequences_string(insuffperm_seqs_idx, seqstr);
197 [ # # ]: 0 : ereport(WARNING,
198 : : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
199 : : errmsg_plural("insufficient privileges on sequence (%s)",
200 : : "insufficient privileges on sequences (%s)",
201 : : list_length(insuffperm_seqs_idx),
202 : : seqstr->data));
203 : : }
204 : :
42 akapila@postgresql.o 205 [ + + ]:GNC 4 : if (missing_seqs_idx)
206 : : {
207 : 1 : get_sequences_string(missing_seqs_idx, seqstr);
208 [ + - ]: 1 : ereport(WARNING,
209 : : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
210 : : errmsg_plural("missing sequence on publisher (%s)",
211 : : "missing sequences on publisher (%s)",
212 : : list_length(missing_seqs_idx),
213 : : seqstr->data));
214 : : }
215 : :
216 [ + - ]: 4 : ereport(ERROR,
217 : : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
218 : : errmsg("logical replication sequence synchronization failed for subscription \"%s\"",
219 : : MySubscription->name));
220 : : }
221 : :
222 : : /*
223 : : * get_and_validate_seq_info
224 : : *
225 : : * Extracts remote sequence information from the tuple slot received from the
226 : : * publisher, and validates it against the corresponding local sequence
227 : : * definition.
228 : : */
229 : : static CopySeqResult
230 : 12 : get_and_validate_seq_info(TupleTableSlot *slot, Relation *sequence_rel,
231 : : LogicalRepSequenceInfo **seqinfo, int *seqidx)
232 : : {
233 : : bool isnull;
234 : 12 : int col = 0;
235 : : Oid remote_typid;
236 : : int64 remote_start;
237 : : int64 remote_increment;
238 : : int64 remote_min;
239 : : int64 remote_max;
240 : : bool remote_cycle;
241 : 12 : CopySeqResult result = COPYSEQ_SUCCESS;
242 : : HeapTuple tup;
243 : : Form_pg_sequence local_seq;
244 : : LogicalRepSequenceInfo *seqinfo_local;
245 : :
246 : 12 : *seqidx = DatumGetInt32(slot_getattr(slot, ++col, &isnull));
247 [ - + ]: 12 : Assert(!isnull);
248 : :
249 : : /* Identify the corresponding local sequence for the given index. */
250 : 12 : *seqinfo = seqinfo_local =
251 : 12 : (LogicalRepSequenceInfo *) list_nth(seqinfos, *seqidx);
252 : :
253 : 12 : seqinfo_local->last_value = DatumGetInt64(slot_getattr(slot, ++col, &isnull));
254 [ - + ]: 12 : Assert(!isnull);
255 : :
256 : 12 : seqinfo_local->is_called = DatumGetBool(slot_getattr(slot, ++col, &isnull));
257 [ - + ]: 12 : Assert(!isnull);
258 : :
259 : 12 : seqinfo_local->page_lsn = DatumGetLSN(slot_getattr(slot, ++col, &isnull));
260 [ - + ]: 12 : Assert(!isnull);
261 : :
262 : 12 : remote_typid = DatumGetObjectId(slot_getattr(slot, ++col, &isnull));
263 [ - + ]: 12 : Assert(!isnull);
264 : :
265 : 12 : remote_start = DatumGetInt64(slot_getattr(slot, ++col, &isnull));
266 [ - + ]: 12 : Assert(!isnull);
267 : :
268 : 12 : remote_increment = DatumGetInt64(slot_getattr(slot, ++col, &isnull));
269 [ - + ]: 12 : Assert(!isnull);
270 : :
271 : 12 : remote_min = DatumGetInt64(slot_getattr(slot, ++col, &isnull));
272 [ - + ]: 12 : Assert(!isnull);
273 : :
274 : 12 : remote_max = DatumGetInt64(slot_getattr(slot, ++col, &isnull));
275 [ - + ]: 12 : Assert(!isnull);
276 : :
277 : 12 : remote_cycle = DatumGetBool(slot_getattr(slot, ++col, &isnull));
278 [ - + ]: 12 : Assert(!isnull);
279 : :
280 : : /* Sanity check */
281 [ - + ]: 12 : Assert(col == REMOTE_SEQ_COL_COUNT);
282 : :
283 : 12 : seqinfo_local->found_on_pub = true;
284 : :
285 : 12 : *sequence_rel = try_table_open(seqinfo_local->localrelid, RowExclusiveLock);
286 : :
287 : : /* Sequence was concurrently dropped? */
288 [ - + ]: 12 : if (!*sequence_rel)
42 akapila@postgresql.o 289 :UNC 0 : return COPYSEQ_SKIPPED;
290 : :
42 akapila@postgresql.o 291 :GNC 12 : tup = SearchSysCache1(SEQRELID, ObjectIdGetDatum(seqinfo_local->localrelid));
292 : :
293 : : /* Sequence was concurrently dropped? */
294 [ - + ]: 12 : if (!HeapTupleIsValid(tup))
42 akapila@postgresql.o 295 [ # # ]:UNC 0 : elog(ERROR, "cache lookup failed for sequence %u",
296 : : seqinfo_local->localrelid);
297 : :
42 akapila@postgresql.o 298 :GNC 12 : local_seq = (Form_pg_sequence) GETSTRUCT(tup);
299 : :
300 : : /* Sequence parameters for remote/local are the same? */
301 [ + - ]: 12 : if (local_seq->seqtypid != remote_typid ||
302 [ + + ]: 12 : local_seq->seqstart != remote_start ||
303 [ + + ]: 11 : local_seq->seqincrement != remote_increment ||
304 [ + - ]: 9 : local_seq->seqmin != remote_min ||
305 [ + - ]: 9 : local_seq->seqmax != remote_max ||
306 [ - + ]: 9 : local_seq->seqcycle != remote_cycle)
307 : 3 : result = COPYSEQ_MISMATCH;
308 : :
309 : : /* Sequence was concurrently renamed? */
310 [ + - ]: 12 : if (strcmp(seqinfo_local->nspname,
311 : 12 : get_namespace_name(RelationGetNamespace(*sequence_rel))) ||
312 [ - + ]: 12 : strcmp(seqinfo_local->seqname, RelationGetRelationName(*sequence_rel)))
42 akapila@postgresql.o 313 :UNC 0 : result = COPYSEQ_MISMATCH;
314 : :
42 akapila@postgresql.o 315 :GNC 12 : ReleaseSysCache(tup);
316 : 12 : return result;
317 : : }
318 : :
319 : : /*
320 : : * Apply remote sequence state to local sequence and mark it as
321 : : * synchronized (READY).
322 : : */
323 : : static CopySeqResult
324 : 9 : copy_sequence(LogicalRepSequenceInfo *seqinfo, Oid seqowner)
325 : : {
326 : : UserContext ucxt;
327 : : AclResult aclresult;
328 : 9 : bool run_as_owner = MySubscription->runasowner;
329 : 9 : Oid seqoid = seqinfo->localrelid;
330 : :
331 : : /*
332 : : * If the user did not opt to run as the owner of the subscription
333 : : * ('run_as_owner'), then copy the sequence as the owner of the sequence.
334 : : */
335 [ + - ]: 9 : if (!run_as_owner)
336 : 9 : SwitchToUntrustedUser(seqowner, &ucxt);
337 : :
338 : 9 : aclresult = pg_class_aclcheck(seqoid, GetUserId(), ACL_UPDATE);
339 : :
340 [ - + ]: 9 : if (aclresult != ACLCHECK_OK)
341 : : {
42 akapila@postgresql.o 342 [ # # ]:UNC 0 : if (!run_as_owner)
343 : 0 : RestoreUserContext(&ucxt);
344 : :
345 : 0 : return COPYSEQ_INSUFFICIENT_PERM;
346 : : }
347 : :
348 : : /*
349 : : * The log counter (log_cnt) tracks how many sequence values are still
350 : : * unused locally. It is only relevant to the local node and managed
351 : : * internally by nextval() when allocating new ranges. Since log_cnt does
352 : : * not affect the visible sequence state (like last_value or is_called)
353 : : * and is only used for local caching, it need not be copied to the
354 : : * subscriber during synchronization.
355 : : */
42 akapila@postgresql.o 356 :GNC 9 : SetSequence(seqoid, seqinfo->last_value, seqinfo->is_called);
357 : :
358 [ + - ]: 9 : if (!run_as_owner)
359 : 9 : RestoreUserContext(&ucxt);
360 : :
361 : : /*
362 : : * Record the remote sequence's LSN in pg_subscription_rel and mark the
363 : : * sequence as READY.
364 : : */
365 : 9 : UpdateSubscriptionRelState(MySubscription->oid, seqoid, SUBREL_STATE_READY,
366 : : seqinfo->page_lsn, false);
367 : :
368 : 9 : return COPYSEQ_SUCCESS;
369 : : }
370 : :
371 : : /*
372 : : * Copy existing data of sequences from the publisher.
373 : : */
374 : : static void
375 : 9 : copy_sequences(WalReceiverConn *conn)
376 : : {
377 : 9 : int cur_batch_base_index = 0;
378 : 9 : int n_seqinfos = list_length(seqinfos);
379 : 9 : List *mismatched_seqs_idx = NIL;
380 : 9 : List *missing_seqs_idx = NIL;
381 : 9 : List *insuffperm_seqs_idx = NIL;
382 : 9 : StringInfo seqstr = makeStringInfo();
383 : 9 : StringInfo cmd = makeStringInfo();
384 : : MemoryContext oldctx;
385 : :
386 : : #define MAX_SEQUENCES_SYNC_PER_BATCH 100
387 : :
388 [ - + ]: 9 : elog(DEBUG1,
389 : : "logical replication sequence synchronization for subscription \"%s\" - total unsynchronized: %d",
390 : : MySubscription->name, n_seqinfos);
391 : :
392 [ + + ]: 18 : while (cur_batch_base_index < n_seqinfos)
393 : : {
394 : 9 : Oid seqRow[REMOTE_SEQ_COL_COUNT] = {INT8OID, INT8OID,
395 : : BOOLOID, LSNOID, OIDOID, INT8OID, INT8OID, INT8OID, INT8OID, BOOLOID};
396 : 9 : int batch_size = 0;
397 : 9 : int batch_succeeded_count = 0;
398 : 9 : int batch_mismatched_count = 0;
399 : 9 : int batch_skipped_count = 0;
400 : 9 : int batch_insuffperm_count = 0;
401 : : int batch_missing_count;
402 : : Relation sequence_rel;
403 : :
404 : : WalRcvExecResult *res;
405 : : TupleTableSlot *slot;
406 : :
407 : 9 : StartTransactionCommand();
408 : :
409 [ + + ]: 22 : for (int idx = cur_batch_base_index; idx < n_seqinfos; idx++)
410 : : {
411 : : char *nspname_literal;
412 : : char *seqname_literal;
413 : :
414 : : LogicalRepSequenceInfo *seqinfo =
415 : 13 : (LogicalRepSequenceInfo *) list_nth(seqinfos, idx);
416 : :
417 [ + + ]: 13 : if (seqstr->len > 0)
418 : 4 : appendStringInfoString(seqstr, ", ");
419 : :
41 420 : 13 : nspname_literal = quote_literal_cstr(seqinfo->nspname);
421 : 13 : seqname_literal = quote_literal_cstr(seqinfo->seqname);
422 : :
423 : 13 : appendStringInfo(seqstr, "(%s, %s, %d)",
424 : : nspname_literal, seqname_literal, idx);
425 : :
42 426 [ - + ]: 13 : if (++batch_size == MAX_SEQUENCES_SYNC_PER_BATCH)
42 akapila@postgresql.o 427 :UNC 0 : break;
428 : : }
429 : :
430 : : /*
431 : : * We deliberately avoid acquiring a local lock on the sequence before
432 : : * querying the publisher to prevent potential distributed deadlocks
433 : : * in bi-directional replication setups.
434 : : *
435 : : * Example scenario:
436 : : *
437 : : * - On each node, a background worker acquires a lock on a sequence
438 : : * as part of a sync operation.
439 : : *
440 : : * - Concurrently, a user transaction attempts to alter the same
441 : : * sequence, waiting on the background worker's lock.
442 : : *
443 : : * - Meanwhile, a query from the other node tries to access metadata
444 : : * that depends on the completion of the alter operation.
445 : : *
446 : : * - This creates a circular wait across nodes:
447 : : *
448 : : * Node-1: Query -> waits on Alter -> waits on Sync Worker
449 : : *
450 : : * Node-2: Query -> waits on Alter -> waits on Sync Worker
451 : : *
452 : : * Since each node only sees part of the wait graph, the deadlock may
453 : : * go undetected, leading to indefinite blocking.
454 : : *
455 : : * Note: Each entry in VALUES includes an index 'seqidx' that
456 : : * represents the sequence's position in the local 'seqinfos' list.
457 : : * This index is propagated to the query results and later used to
458 : : * directly map the fetched publisher sequence rows back to their
459 : : * corresponding local entries without relying on result order or name
460 : : * matching.
461 : : */
42 akapila@postgresql.o 462 :GNC 9 : appendStringInfo(cmd,
463 : : "SELECT s.seqidx, ps.*, seq.seqtypid,\n"
464 : : " seq.seqstart, seq.seqincrement, seq.seqmin,\n"
465 : : " seq.seqmax, seq.seqcycle\n"
466 : : "FROM ( VALUES %s ) AS s (schname, seqname, seqidx)\n"
467 : : "JOIN pg_namespace n ON n.nspname = s.schname\n"
468 : : "JOIN pg_class c ON c.relnamespace = n.oid AND c.relname = s.seqname\n"
469 : : "JOIN pg_sequence seq ON seq.seqrelid = c.oid\n"
470 : : "JOIN LATERAL pg_get_sequence_data(seq.seqrelid) AS ps ON true\n",
471 : : seqstr->data);
472 : :
473 : 9 : res = walrcv_exec(conn, cmd->data, lengthof(seqRow), seqRow);
474 [ - + ]: 9 : if (res->status != WALRCV_OK_TUPLES)
42 akapila@postgresql.o 475 [ # # ]:UNC 0 : ereport(ERROR,
476 : : errcode(ERRCODE_CONNECTION_FAILURE),
477 : : errmsg("could not fetch sequence information from the publisher: %s",
478 : : res->err));
479 : :
42 akapila@postgresql.o 480 :GNC 9 : slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
481 [ + + ]: 21 : while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
482 : : {
483 : : CopySeqResult sync_status;
484 : : LogicalRepSequenceInfo *seqinfo;
485 : : int seqidx;
486 : :
487 [ - + ]: 12 : CHECK_FOR_INTERRUPTS();
488 : :
489 [ - + ]: 12 : if (ConfigReloadPending)
490 : : {
42 akapila@postgresql.o 491 :UNC 0 : ConfigReloadPending = false;
492 : 0 : ProcessConfigFile(PGC_SIGHUP);
493 : : }
494 : :
42 akapila@postgresql.o 495 :GNC 12 : sync_status = get_and_validate_seq_info(slot, &sequence_rel,
496 : : &seqinfo, &seqidx);
497 [ + + ]: 12 : if (sync_status == COPYSEQ_SUCCESS)
498 : 9 : sync_status = copy_sequence(seqinfo,
499 : 9 : sequence_rel->rd_rel->relowner);
500 : :
501 [ + + - - : 12 : switch (sync_status)
- ]
502 : : {
503 : 9 : case COPYSEQ_SUCCESS:
504 [ - + ]: 9 : elog(DEBUG1,
505 : : "logical replication synchronization for subscription \"%s\", sequence \"%s.%s\" has finished",
506 : : MySubscription->name, seqinfo->nspname,
507 : : seqinfo->seqname);
508 : 9 : batch_succeeded_count++;
509 : 9 : break;
510 : 3 : case COPYSEQ_MISMATCH:
511 : :
512 : : /*
513 : : * Remember mismatched sequences in a long-lived memory
514 : : * context since these will be used after the transaction
515 : : * is committed.
516 : : */
517 : 3 : oldctx = MemoryContextSwitchTo(ApplyContext);
518 : 3 : mismatched_seqs_idx = lappend_int(mismatched_seqs_idx,
519 : : seqidx);
520 : 3 : MemoryContextSwitchTo(oldctx);
521 : 3 : batch_mismatched_count++;
522 : 3 : break;
42 akapila@postgresql.o 523 :UNC 0 : case COPYSEQ_INSUFFICIENT_PERM:
524 : :
525 : : /*
526 : : * Remember sequences with insufficient privileges in a
527 : : * long-lived memory context since these will be used
528 : : * after the transaction is committed.
529 : : */
530 : 0 : oldctx = MemoryContextSwitchTo(ApplyContext);
531 : 0 : insuffperm_seqs_idx = lappend_int(insuffperm_seqs_idx,
532 : : seqidx);
533 : 0 : MemoryContextSwitchTo(oldctx);
534 : 0 : batch_insuffperm_count++;
535 : 0 : break;
536 : 0 : case COPYSEQ_SKIPPED:
537 [ # # ]: 0 : ereport(LOG,
538 : : errmsg("skip synchronization of sequence \"%s.%s\" because it has been dropped concurrently",
539 : : seqinfo->nspname,
540 : : seqinfo->seqname));
541 : 0 : batch_skipped_count++;
542 : 0 : break;
543 : : }
544 : :
42 akapila@postgresql.o 545 [ + - ]:GNC 12 : if (sequence_rel)
546 : 12 : table_close(sequence_rel, NoLock);
547 : : }
548 : :
549 : 9 : ExecDropSingleTupleTableSlot(slot);
550 : 9 : walrcv_clear_result(res);
551 : 9 : resetStringInfo(seqstr);
552 : 9 : resetStringInfo(cmd);
553 : :
554 : 9 : batch_missing_count = batch_size - (batch_succeeded_count +
555 : 9 : batch_mismatched_count +
556 : 9 : batch_insuffperm_count +
557 : : batch_skipped_count);
558 : :
559 [ - + ]: 9 : elog(DEBUG1,
560 : : "logical replication sequence synchronization for subscription \"%s\" - batch #%d = %d attempted, %d succeeded, %d mismatched, %d insufficient permission, %d missing from publisher, %d skipped",
561 : : MySubscription->name,
562 : : (cur_batch_base_index / MAX_SEQUENCES_SYNC_PER_BATCH) + 1,
563 : : batch_size, batch_succeeded_count, batch_mismatched_count,
564 : : batch_insuffperm_count, batch_missing_count, batch_skipped_count);
565 : :
566 : : /* Commit this batch, and prepare for next batch */
567 : 9 : CommitTransactionCommand();
568 : :
569 [ + + ]: 9 : if (batch_missing_count)
570 : : {
571 [ + + ]: 2 : for (int idx = cur_batch_base_index; idx < cur_batch_base_index + batch_size; idx++)
572 : : {
573 : : LogicalRepSequenceInfo *seqinfo =
574 : 1 : (LogicalRepSequenceInfo *) list_nth(seqinfos, idx);
575 : :
576 : : /* If the sequence was not found on publisher, record it */
577 [ + - ]: 1 : if (!seqinfo->found_on_pub)
578 : 1 : missing_seqs_idx = lappend_int(missing_seqs_idx, idx);
579 : : }
580 : : }
581 : :
582 : : /*
583 : : * cur_batch_base_index is not incremented sequentially because some
584 : : * sequences may be missing, and the number of fetched rows may not
585 : : * match the batch size.
586 : : */
587 : 9 : cur_batch_base_index += batch_size;
588 : : }
589 : :
590 : : /* Report mismatches, permission issues, or missing sequences */
591 : 9 : report_sequence_errors(mismatched_seqs_idx, insuffperm_seqs_idx,
592 : : missing_seqs_idx);
593 : 5 : }
594 : :
595 : : /*
596 : : * Identifies sequences that require synchronization and initiates the
597 : : * synchronization process.
598 : : */
599 : : static void
600 : 9 : LogicalRepSyncSequences(void)
601 : : {
602 : : char *err;
603 : : bool must_use_password;
604 : : Relation rel;
605 : : HeapTuple tup;
606 : : ScanKeyData skey[2];
607 : : SysScanDesc scan;
608 : 9 : Oid subid = MyLogicalRepWorker->subid;
609 : : StringInfoData app_name;
610 : :
611 : 9 : StartTransactionCommand();
612 : :
613 : 9 : rel = table_open(SubscriptionRelRelationId, AccessShareLock);
614 : :
615 : 9 : ScanKeyInit(&skey[0],
616 : : Anum_pg_subscription_rel_srsubid,
617 : : BTEqualStrategyNumber, F_OIDEQ,
618 : : ObjectIdGetDatum(subid));
619 : :
620 : 9 : ScanKeyInit(&skey[1],
621 : : Anum_pg_subscription_rel_srsubstate,
622 : : BTEqualStrategyNumber, F_CHAREQ,
623 : : CharGetDatum(SUBREL_STATE_INIT));
624 : :
625 : 9 : scan = systable_beginscan(rel, InvalidOid, false,
626 : : NULL, 2, skey);
627 [ + + ]: 23 : while (HeapTupleIsValid(tup = systable_getnext(scan)))
628 : : {
629 : : Form_pg_subscription_rel subrel;
630 : : LogicalRepSequenceInfo *seq;
631 : : Relation sequence_rel;
632 : : MemoryContext oldctx;
633 : :
634 [ - + ]: 14 : CHECK_FOR_INTERRUPTS();
635 : :
636 : 14 : subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
637 : :
638 : 14 : sequence_rel = try_table_open(subrel->srrelid, RowExclusiveLock);
639 : :
640 : : /* Skip if sequence was dropped concurrently */
641 [ - + ]: 14 : if (!sequence_rel)
42 akapila@postgresql.o 642 :UNC 0 : continue;
643 : :
644 : : /* Skip if the relation is not a sequence */
42 akapila@postgresql.o 645 [ + + ]:GNC 14 : if (sequence_rel->rd_rel->relkind != RELKIND_SEQUENCE)
646 : : {
647 : 1 : table_close(sequence_rel, NoLock);
648 : 1 : continue;
649 : : }
650 : :
651 : : /*
652 : : * Worker needs to process sequences across transaction boundary, so
653 : : * allocate them under long-lived context.
654 : : */
655 : 13 : oldctx = MemoryContextSwitchTo(ApplyContext);
656 : :
657 : 13 : seq = palloc0_object(LogicalRepSequenceInfo);
658 : 13 : seq->localrelid = subrel->srrelid;
659 : 13 : seq->nspname = get_namespace_name(RelationGetNamespace(sequence_rel));
660 : 13 : seq->seqname = pstrdup(RelationGetRelationName(sequence_rel));
661 : 13 : seqinfos = lappend(seqinfos, seq);
662 : :
663 : 13 : MemoryContextSwitchTo(oldctx);
664 : :
665 : 13 : table_close(sequence_rel, NoLock);
666 : : }
667 : :
668 : : /* Cleanup */
669 : 9 : systable_endscan(scan);
670 : 9 : table_close(rel, AccessShareLock);
671 : :
672 : 9 : CommitTransactionCommand();
673 : :
674 : : /*
675 : : * Exit early if no catalog entries found, likely due to concurrent drops.
676 : : */
677 [ - + ]: 9 : if (!seqinfos)
42 akapila@postgresql.o 678 :UNC 0 : return;
679 : :
680 : : /* Is the use of a password mandatory? */
42 akapila@postgresql.o 681 [ + - ]:GNC 18 : must_use_password = MySubscription->passwordrequired &&
682 [ - + ]: 9 : !MySubscription->ownersuperuser;
683 : :
684 : 9 : initStringInfo(&app_name);
685 : 9 : appendStringInfo(&app_name, "pg_%u_sequence_sync_" UINT64_FORMAT,
686 : 9 : MySubscription->oid, GetSystemIdentifier());
687 : :
688 : : /*
689 : : * Establish the connection to the publisher for sequence synchronization.
690 : : */
691 : 9 : LogRepWorkerWalRcvConn =
692 : 9 : walrcv_connect(MySubscription->conninfo, true, true,
693 : : must_use_password,
694 : : app_name.data, &err);
695 [ - + ]: 9 : if (LogRepWorkerWalRcvConn == NULL)
42 akapila@postgresql.o 696 [ # # ]:UNC 0 : ereport(ERROR,
697 : : errcode(ERRCODE_CONNECTION_FAILURE),
698 : : errmsg("sequencesync worker for subscription \"%s\" could not connect to the publisher: %s",
699 : : MySubscription->name, err));
700 : :
42 akapila@postgresql.o 701 :GNC 9 : pfree(app_name.data);
702 : :
703 : 9 : copy_sequences(LogRepWorkerWalRcvConn);
704 : : }
705 : :
706 : : /*
707 : : * Execute the initial sync with error handling. Disable the subscription,
708 : : * if required.
709 : : *
710 : : * Note that we don't handle FATAL errors which are probably because of system
711 : : * resource error and are not repeatable.
712 : : */
713 : : static void
14 nathan@postgresql.or 714 : 9 : start_sequence_sync(void)
715 : : {
42 akapila@postgresql.o 716 [ - + ]: 9 : Assert(am_sequencesync_worker());
717 : :
718 [ + + ]: 9 : PG_TRY();
719 : : {
720 : : /* Call initial sync. */
721 : 9 : LogicalRepSyncSequences();
722 : : }
723 : 4 : PG_CATCH();
724 : : {
725 [ - + ]: 4 : if (MySubscription->disableonerr)
42 akapila@postgresql.o 726 :UNC 0 : DisableSubscriptionAndExit();
727 : : else
728 : : {
729 : : /*
730 : : * Report the worker failed during sequence synchronization. Abort
731 : : * the current transaction so that the stats message is sent in an
732 : : * idle state.
733 : : */
42 akapila@postgresql.o 734 :GNC 4 : AbortOutOfAnyTransaction();
40 735 : 4 : pgstat_report_subscription_error(MySubscription->oid,
736 : : WORKERTYPE_SEQUENCESYNC);
737 : :
42 738 : 4 : PG_RE_THROW();
739 : : }
740 : : }
741 [ - + ]: 5 : PG_END_TRY();
742 : 5 : }
743 : :
744 : : /* Logical Replication sequencesync worker entry point */
745 : : void
746 : 9 : SequenceSyncWorkerMain(Datum main_arg)
747 : : {
748 : 9 : int worker_slot = DatumGetInt32(main_arg);
749 : :
750 : 9 : SetupApplyOrSyncWorker(worker_slot);
751 : :
752 : 9 : start_sequence_sync();
753 : :
754 : 5 : FinishSyncWorker();
755 : : }
|