Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : * syncutils.c
3 : : * PostgreSQL logical replication: common synchronization code
4 : : *
5 : : * Copyright (c) 2025-2026, PostgreSQL Global Development Group
6 : : *
7 : : * IDENTIFICATION
8 : : * src/backend/replication/logical/syncutils.c
9 : : *
10 : : * NOTES
11 : : * This file contains code common for synchronization workers.
12 : : *-------------------------------------------------------------------------
13 : : */
14 : :
15 : : #include "postgres.h"
16 : :
17 : : #include "catalog/pg_subscription_rel.h"
18 : : #include "pgstat.h"
19 : : #include "replication/logicallauncher.h"
20 : : #include "replication/worker_internal.h"
21 : : #include "storage/ipc.h"
22 : : #include "utils/lsyscache.h"
23 : : #include "utils/memutils.h"
24 : :
25 : : /*
26 : : * Enum for phases of the subscription relations state.
27 : : *
28 : : * SYNC_RELATIONS_STATE_NEEDS_REBUILD indicates that the subscription relations
29 : : * state is no longer valid, and the subscription relations should be rebuilt.
30 : : *
31 : : * SYNC_RELATIONS_STATE_REBUILD_STARTED indicates that the subscription
32 : : * relations state is being rebuilt.
33 : : *
34 : : * SYNC_RELATIONS_STATE_VALID indicates that the subscription relation state is
35 : : * up-to-date and valid.
36 : : */
37 : : typedef enum
38 : : {
39 : : SYNC_RELATIONS_STATE_NEEDS_REBUILD,
40 : : SYNC_RELATIONS_STATE_REBUILD_STARTED,
41 : : SYNC_RELATIONS_STATE_VALID,
42 : : } SyncingRelationsState;
43 : :
44 : : static SyncingRelationsState relation_states_validity = SYNC_RELATIONS_STATE_NEEDS_REBUILD;
45 : :
46 : : /*
47 : : * Exit routine for synchronization worker.
48 : : */
49 : : pg_noreturn void
150 akapila@postgresql.o 50 :GNC 198 : FinishSyncWorker(void)
51 : : {
130 52 [ + + - + ]: 198 : Assert(am_sequencesync_worker() || am_tablesync_worker());
53 : :
54 : : /*
55 : : * Commit any outstanding transaction. This is the usual case, unless
56 : : * there was nothing to do for the table.
57 : : */
150 58 [ + + ]: 198 : if (IsTransactionState())
59 : : {
60 : 193 : CommitTransactionCommand();
61 : 193 : pgstat_report_stat(true);
62 : : }
63 : :
64 : : /* And flush all writes. */
65 : 198 : XLogFlush(GetXLogWriteRecPtr());
66 : :
130 67 [ + + ]: 198 : if (am_sequencesync_worker())
68 : : {
69 [ + - ]: 5 : ereport(LOG,
70 : : errmsg("logical replication sequence synchronization worker for subscription \"%s\" has finished",
71 : : MySubscription->name));
72 : :
73 : : /*
74 : : * Reset last_seqsync_start_time, so that next time a sequencesync
75 : : * worker is needed it can be started promptly.
76 : : */
77 : 5 : logicalrep_reset_seqsync_start_time();
78 : : }
79 : : else
80 : : {
81 : 193 : StartTransactionCommand();
82 [ + - ]: 193 : ereport(LOG,
83 : : errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished",
84 : : MySubscription->name,
85 : : get_rel_name(MyLogicalRepWorker->relid)));
86 : 193 : CommitTransactionCommand();
87 : :
88 : : /* Find the leader apply worker and signal it. */
89 : 193 : logicalrep_worker_wakeup(WORKERTYPE_APPLY, MyLogicalRepWorker->subid,
90 : : InvalidOid);
91 : : }
92 : :
93 : : /* Stop gracefully */
150 94 : 198 : proc_exit(0);
95 : : }
96 : :
97 : : /*
98 : : * Callback from syscache invalidation.
99 : : */
100 : : void
25 michael@paquier.xyz 101 : 2040 : InvalidateSyncingRelStates(Datum arg, SysCacheIdentifier cacheid,
102 : : uint32 hashvalue)
103 : : {
150 akapila@postgresql.o 104 : 2040 : relation_states_validity = SYNC_RELATIONS_STATE_NEEDS_REBUILD;
105 : 2040 : }
106 : :
107 : : /*
108 : : * Attempt to launch a sync worker for one or more sequences or a table, if
109 : : * a worker slot is available and the retry interval has elapsed.
110 : : *
111 : : * wtype: sync worker type.
112 : : * nsyncworkers: Number of currently running sync workers for the subscription.
113 : : * relid: InvalidOid for sequencesync worker, actual relid for tablesync
114 : : * worker.
115 : : * last_start_time: Pointer to the last start time of the worker.
116 : : */
117 : : void
130 118 : 946 : launch_sync_worker(LogicalRepWorkerType wtype, int nsyncworkers, Oid relid,
119 : : TimestampTz *last_start_time)
120 : : {
121 : : TimestampTz now;
122 : :
123 [ + + - + : 946 : Assert((wtype == WORKERTYPE_TABLESYNC && OidIsValid(relid)) ||
+ - - + ]
124 : : (wtype == WORKERTYPE_SEQUENCESYNC && !OidIsValid(relid)));
125 : :
126 : : /* If there is a free sync worker slot, start a new sync worker */
127 [ + + ]: 946 : if (nsyncworkers >= max_sync_workers_per_subscription)
128 : 707 : return;
129 : :
130 : 239 : now = GetCurrentTimestamp();
131 : :
132 [ + + + + ]: 272 : if (!(*last_start_time) ||
133 : 33 : TimestampDifferenceExceeds(*last_start_time, now,
134 : : wal_retrieve_retry_interval))
135 : : {
136 : : /*
137 : : * Set the last_start_time even if we fail to start the worker, so
138 : : * that we won't retry until wal_retrieve_retry_interval has elapsed.
139 : : */
140 : 215 : *last_start_time = now;
141 : 215 : (void) logicalrep_worker_launch(wtype,
142 : 215 : MyLogicalRepWorker->dbid,
143 : 215 : MySubscription->oid,
144 : 215 : MySubscription->name,
145 : 215 : MyLogicalRepWorker->userid,
146 : : relid, DSM_HANDLE_INVALID, false);
147 : : }
148 : : }
149 : :
150 : : /*
151 : : * Process possible state change(s) of relations that are being synchronized
152 : : * and start new tablesync workers for the newly added tables. Also, start a
153 : : * new sequencesync worker for the newly added sequences.
154 : : */
155 : : void
150 156 : 5648 : ProcessSyncingRelations(XLogRecPtr current_lsn)
157 : : {
158 [ + + + - : 5648 : switch (MyLogicalRepWorker->type)
- - ]
159 : : {
160 : 23 : case WORKERTYPE_PARALLEL_APPLY:
161 : :
162 : : /*
163 : : * Skip for parallel apply workers because they only operate on
164 : : * tables that are in a READY state. See pa_can_start() and
165 : : * should_apply_changes_for_rel().
166 : : */
167 : 23 : break;
168 : :
169 : 205 : case WORKERTYPE_TABLESYNC:
170 : 205 : ProcessSyncingTablesForSync(current_lsn);
171 : 12 : break;
172 : :
173 : 5420 : case WORKERTYPE_APPLY:
174 : 5420 : ProcessSyncingTablesForApply(current_lsn);
130 175 : 5413 : ProcessSequencesForSync();
176 : 5413 : break;
177 : :
130 akapila@postgresql.o 178 :UNC 0 : case WORKERTYPE_SEQUENCESYNC:
179 : : /* Should never happen. */
180 [ # # ]: 0 : elog(ERROR, "sequence synchronization worker is not expected to process relations");
181 : : break;
182 : :
150 183 : 0 : case WORKERTYPE_UNKNOWN:
184 : : /* Should never happen. */
185 [ # # ]: 0 : elog(ERROR, "Unknown worker type");
186 : : }
150 akapila@postgresql.o 187 :GNC 5448 : }
188 : :
189 : : /*
190 : : * Common code to fetch the up-to-date sync state info for tables and sequences.
191 : : *
192 : : * The pg_subscription_rel catalog is shared by tables and sequences. Changes
193 : : * to either sequences or tables can affect the validity of relation states, so
194 : : * we identify non-READY tables and non-READY sequences together to ensure
195 : : * consistency.
196 : : *
197 : : * has_pending_subtables: true if the subscription has one or more tables that
198 : : * are not in READY state, otherwise false.
199 : : * has_pending_subsequences: true if the subscription has one or more sequences
200 : : * that are not in READY state, otherwise false.
201 : : */
202 : : void
130 203 : 11151 : FetchRelationStates(bool *has_pending_subtables,
204 : : bool *has_pending_subsequences,
205 : : bool *started_tx)
206 : : {
207 : : /*
208 : : * has_subtables and has_subsequences_non_ready are declared as static,
209 : : * since the same value can be used until the system table is invalidated.
210 : : */
211 : : static bool has_subtables = false;
212 : : static bool has_subsequences_non_ready = false;
213 : :
150 214 : 11151 : *started_tx = false;
215 : :
216 [ + + ]: 11151 : if (relation_states_validity != SYNC_RELATIONS_STATE_VALID)
217 : : {
218 : : MemoryContext oldctx;
219 : : List *rstates;
220 : : SubscriptionRelState *rstate;
221 : :
222 : 1014 : relation_states_validity = SYNC_RELATIONS_STATE_REBUILD_STARTED;
130 223 : 1014 : has_subsequences_non_ready = false;
224 : :
225 : : /* Clean the old lists. */
150 226 : 1014 : list_free_deep(table_states_not_ready);
227 : 1014 : table_states_not_ready = NIL;
228 : :
229 [ + + ]: 1014 : if (!IsTransactionState())
230 : : {
231 : 997 : StartTransactionCommand();
232 : 997 : *started_tx = true;
233 : : }
234 : :
235 : : /* Fetch tables and sequences that are in non-READY state. */
130 236 : 1014 : rstates = GetSubscriptionRelations(MySubscription->oid, true, true,
237 : : true);
238 : :
239 : : /* Allocate the tracking info in a permanent memory context. */
150 240 : 1014 : oldctx = MemoryContextSwitchTo(CacheMemoryContext);
130 241 [ + + + + : 3878 : foreach_ptr(SubscriptionRelState, subrel, rstates)
+ + ]
242 : : {
243 [ + + ]: 1850 : if (get_rel_relkind(subrel->relid) == RELKIND_SEQUENCE)
244 : 16 : has_subsequences_non_ready = true;
245 : : else
246 : : {
95 michael@paquier.xyz 247 : 1834 : rstate = palloc_object(SubscriptionRelState);
130 akapila@postgresql.o 248 : 1834 : memcpy(rstate, subrel, sizeof(SubscriptionRelState));
249 : 1834 : table_states_not_ready = lappend(table_states_not_ready,
250 : : rstate);
251 : : }
252 : : }
150 253 : 1014 : MemoryContextSwitchTo(oldctx);
254 : :
255 : : /*
256 : : * Does the subscription have tables?
257 : : *
258 : : * If there were not-READY tables found then we know it does. But if
259 : : * table_states_not_ready was empty we still need to check again to
260 : : * see if there are 0 tables.
261 : : */
262 [ + + + + ]: 1291 : has_subtables = (table_states_not_ready != NIL) ||
263 : 277 : HasSubscriptionTables(MySubscription->oid);
264 : :
265 : : /*
266 : : * If the subscription relation cache has been invalidated since we
267 : : * entered this routine, we still use and return the relations we just
268 : : * finished constructing, to avoid infinite loops, but we leave the
269 : : * table states marked as stale so that we'll rebuild it again on next
270 : : * access. Otherwise, we mark the table states as valid.
271 : : */
272 [ + + ]: 1014 : if (relation_states_validity == SYNC_RELATIONS_STATE_REBUILD_STARTED)
273 : 1010 : relation_states_validity = SYNC_RELATIONS_STATE_VALID;
274 : : }
275 : :
130 276 [ + + ]: 11151 : if (has_pending_subtables)
277 : 318 : *has_pending_subtables = has_subtables;
278 : :
279 [ + + ]: 11151 : if (has_pending_subsequences)
280 : 5413 : *has_pending_subsequences = has_subsequences_non_ready;
150 281 : 11151 : }
|