Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : * syncutils.c
3 : : * PostgreSQL logical replication: common synchronization code
4 : : *
5 : : * Copyright (c) 2025, PostgreSQL Global Development Group
6 : : *
7 : : * IDENTIFICATION
8 : : * src/backend/replication/logical/syncutils.c
9 : : *
10 : : * NOTES
11 : : * This file contains code common for synchronization workers.
12 : : *-------------------------------------------------------------------------
13 : : */
14 : :
15 : : #include "postgres.h"
16 : :
17 : : #include "catalog/pg_subscription_rel.h"
18 : : #include "pgstat.h"
19 : : #include "replication/logicallauncher.h"
20 : : #include "replication/worker_internal.h"
21 : : #include "storage/ipc.h"
22 : : #include "utils/lsyscache.h"
23 : : #include "utils/memutils.h"
24 : :
25 : : /*
26 : : * Enum for phases of the subscription relations state.
27 : : *
28 : : * SYNC_RELATIONS_STATE_NEEDS_REBUILD indicates that the subscription relations
29 : : * state is no longer valid, and the subscription relations should be rebuilt.
30 : : *
31 : : * SYNC_RELATIONS_STATE_REBUILD_STARTED indicates that the subscription
32 : : * relations state is being rebuilt.
33 : : *
34 : : * SYNC_RELATIONS_STATE_VALID indicates that the subscription relation state is
35 : : * up-to-date and valid.
36 : : */
37 : : typedef enum
38 : : {
39 : : SYNC_RELATIONS_STATE_NEEDS_REBUILD,
40 : : SYNC_RELATIONS_STATE_REBUILD_STARTED,
41 : : SYNC_RELATIONS_STATE_VALID,
42 : : } SyncingRelationsState;
43 : :
44 : : static SyncingRelationsState relation_states_validity = SYNC_RELATIONS_STATE_NEEDS_REBUILD;
45 : :
46 : : /*
47 : : * Exit routine for synchronization worker.
48 : : */
49 : : pg_noreturn void
62 akapila@postgresql.o 50 :GNC 190 : FinishSyncWorker(void)
51 : : {
42 52 [ + + - + ]: 190 : Assert(am_sequencesync_worker() || am_tablesync_worker());
53 : :
54 : : /*
55 : : * Commit any outstanding transaction. This is the usual case, unless
56 : : * there was nothing to do for the table.
57 : : */
62 58 [ + + ]: 190 : if (IsTransactionState())
59 : : {
60 : 185 : CommitTransactionCommand();
61 : 185 : pgstat_report_stat(true);
62 : : }
63 : :
64 : : /* And flush all writes. */
65 : 190 : XLogFlush(GetXLogWriteRecPtr());
66 : :
42 67 [ + + ]: 190 : if (am_sequencesync_worker())
68 : : {
69 [ + - ]: 5 : ereport(LOG,
70 : : errmsg("logical replication sequence synchronization worker for subscription \"%s\" has finished",
71 : : MySubscription->name));
72 : :
73 : : /*
74 : : * Reset last_seqsync_start_time, so that next time a sequencesync
75 : : * worker is needed it can be started promptly.
76 : : */
77 : 5 : logicalrep_reset_seqsync_start_time();
78 : : }
79 : : else
80 : : {
81 : 185 : StartTransactionCommand();
82 [ + - ]: 185 : ereport(LOG,
83 : : errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished",
84 : : MySubscription->name,
85 : : get_rel_name(MyLogicalRepWorker->relid)));
86 : 185 : CommitTransactionCommand();
87 : :
88 : : /* Find the leader apply worker and signal it. */
89 : 185 : logicalrep_worker_wakeup(WORKERTYPE_APPLY, MyLogicalRepWorker->subid,
90 : : InvalidOid);
91 : : }
92 : :
93 : : /* Stop gracefully */
62 94 : 190 : proc_exit(0);
95 : : }
96 : :
97 : : /*
98 : : * Callback from syscache invalidation.
99 : : */
100 : : void
101 : 1750 : InvalidateSyncingRelStates(Datum arg, int cacheid, uint32 hashvalue)
102 : : {
103 : 1750 : relation_states_validity = SYNC_RELATIONS_STATE_NEEDS_REBUILD;
104 : 1750 : }
105 : :
106 : : /*
107 : : * Attempt to launch a sync worker for one or more sequences or a table, if
108 : : * a worker slot is available and the retry interval has elapsed.
109 : : *
110 : : * wtype: sync worker type.
111 : : * nsyncworkers: Number of currently running sync workers for the subscription.
112 : : * relid: InvalidOid for sequencesync worker, actual relid for tablesync
113 : : * worker.
114 : : * last_start_time: Pointer to the last start time of the worker.
115 : : */
116 : : void
42 117 : 909 : launch_sync_worker(LogicalRepWorkerType wtype, int nsyncworkers, Oid relid,
118 : : TimestampTz *last_start_time)
119 : : {
120 : : TimestampTz now;
121 : :
122 [ + + - + : 909 : Assert((wtype == WORKERTYPE_TABLESYNC && OidIsValid(relid)) ||
+ - - + ]
123 : : (wtype == WORKERTYPE_SEQUENCESYNC && !OidIsValid(relid)));
124 : :
125 : : /* If there is a free sync worker slot, start a new sync worker */
126 [ + + ]: 909 : if (nsyncworkers >= max_sync_workers_per_subscription)
127 : 681 : return;
128 : :
129 : 228 : now = GetCurrentTimestamp();
130 : :
131 [ + + + + ]: 258 : if (!(*last_start_time) ||
132 : 30 : TimestampDifferenceExceeds(*last_start_time, now,
133 : : wal_retrieve_retry_interval))
134 : : {
135 : : /*
136 : : * Set the last_start_time even if we fail to start the worker, so
137 : : * that we won't retry until wal_retrieve_retry_interval has elapsed.
138 : : */
139 : 207 : *last_start_time = now;
140 : 207 : (void) logicalrep_worker_launch(wtype,
141 : 207 : MyLogicalRepWorker->dbid,
142 : 207 : MySubscription->oid,
143 : 207 : MySubscription->name,
144 : 207 : MyLogicalRepWorker->userid,
145 : : relid, DSM_HANDLE_INVALID, false);
146 : : }
147 : : }
148 : :
149 : : /*
150 : : * Process possible state change(s) of relations that are being synchronized
151 : : * and start new tablesync workers for the newly added tables. Also, start a
152 : : * new sequencesync worker for the newly added sequences.
153 : : */
154 : : void
62 155 : 4034 : ProcessSyncingRelations(XLogRecPtr current_lsn)
156 : : {
157 [ + + + - : 4034 : switch (MyLogicalRepWorker->type)
- - ]
158 : : {
159 : 22 : case WORKERTYPE_PARALLEL_APPLY:
160 : :
161 : : /*
162 : : * Skip for parallel apply workers because they only operate on
163 : : * tables that are in a READY state. See pa_can_start() and
164 : : * should_apply_changes_for_rel().
165 : : */
166 : 22 : break;
167 : :
168 : 200 : case WORKERTYPE_TABLESYNC:
169 : 200 : ProcessSyncingTablesForSync(current_lsn);
170 : 15 : break;
171 : :
172 : 3812 : case WORKERTYPE_APPLY:
173 : 3812 : ProcessSyncingTablesForApply(current_lsn);
42 174 : 3805 : ProcessSequencesForSync();
175 : 3805 : break;
176 : :
42 akapila@postgresql.o 177 :UNC 0 : case WORKERTYPE_SEQUENCESYNC:
178 : : /* Should never happen. */
179 [ # # ]: 0 : elog(ERROR, "sequence synchronization worker is not expected to process relations");
180 : : break;
181 : :
62 182 : 0 : case WORKERTYPE_UNKNOWN:
183 : : /* Should never happen. */
184 [ # # ]: 0 : elog(ERROR, "Unknown worker type");
185 : : }
62 akapila@postgresql.o 186 :GNC 3842 : }
187 : :
188 : : /*
189 : : * Common code to fetch the up-to-date sync state info for tables and sequences.
190 : : *
191 : : * The pg_subscription_rel catalog is shared by tables and sequences. Changes
192 : : * to either sequences or tables can affect the validity of relation states, so
193 : : * we identify non-READY tables and non-READY sequences together to ensure
194 : : * consistency.
195 : : *
196 : : * has_pending_subtables: true if the subscription has one or more tables that
197 : : * are not in READY state, otherwise false.
198 : : * has_pending_subsequences: true if the subscription has one or more sequences
199 : : * that are not in READY state, otherwise false.
200 : : */
201 : : void
42 202 : 7856 : FetchRelationStates(bool *has_pending_subtables,
203 : : bool *has_pending_subsequences,
204 : : bool *started_tx)
205 : : {
206 : : /*
207 : : * has_subtables and has_subsequences_non_ready are declared as static,
208 : : * since the same value can be used until the system table is invalidated.
209 : : */
210 : : static bool has_subtables = false;
211 : : static bool has_subsequences_non_ready = false;
212 : :
62 213 : 7856 : *started_tx = false;
214 : :
215 [ + + ]: 7856 : if (relation_states_validity != SYNC_RELATIONS_STATE_VALID)
216 : : {
217 : : MemoryContext oldctx;
218 : : List *rstates;
219 : : SubscriptionRelState *rstate;
220 : :
221 : 964 : relation_states_validity = SYNC_RELATIONS_STATE_REBUILD_STARTED;
42 222 : 964 : has_subsequences_non_ready = false;
223 : :
224 : : /* Clean the old lists. */
62 225 : 964 : list_free_deep(table_states_not_ready);
226 : 964 : table_states_not_ready = NIL;
227 : :
228 [ + + ]: 964 : if (!IsTransactionState())
229 : : {
230 : 948 : StartTransactionCommand();
231 : 948 : *started_tx = true;
232 : : }
233 : :
234 : : /* Fetch tables and sequences that are in non-READY state. */
42 235 : 964 : rstates = GetSubscriptionRelations(MySubscription->oid, true, true,
236 : : true);
237 : :
238 : : /* Allocate the tracking info in a permanent memory context. */
62 239 : 964 : oldctx = MemoryContextSwitchTo(CacheMemoryContext);
42 240 [ + + + + : 3729 : foreach_ptr(SubscriptionRelState, subrel, rstates)
+ + ]
241 : : {
242 [ + + ]: 1801 : if (get_rel_relkind(subrel->relid) == RELKIND_SEQUENCE)
243 : 16 : has_subsequences_non_ready = true;
244 : : else
245 : : {
7 michael@paquier.xyz 246 : 1785 : rstate = palloc_object(SubscriptionRelState);
42 akapila@postgresql.o 247 : 1785 : memcpy(rstate, subrel, sizeof(SubscriptionRelState));
248 : 1785 : table_states_not_ready = lappend(table_states_not_ready,
249 : : rstate);
250 : : }
251 : : }
62 252 : 964 : MemoryContextSwitchTo(oldctx);
253 : :
254 : : /*
255 : : * Does the subscription have tables?
256 : : *
257 : : * If there were not-READY tables found then we know it does. But if
258 : : * table_states_not_ready was empty we still need to check again to
259 : : * see if there are 0 tables.
260 : : */
261 [ + + + + ]: 1217 : has_subtables = (table_states_not_ready != NIL) ||
262 : 253 : HasSubscriptionTables(MySubscription->oid);
263 : :
264 : : /*
265 : : * If the subscription relation cache has been invalidated since we
266 : : * entered this routine, we still use and return the relations we just
267 : : * finished constructing, to avoid infinite loops, but we leave the
268 : : * table states marked as stale so that we'll rebuild it again on next
269 : : * access. Otherwise, we mark the table states as valid.
270 : : */
271 [ + + ]: 964 : if (relation_states_validity == SYNC_RELATIONS_STATE_REBUILD_STARTED)
272 : 957 : relation_states_validity = SYNC_RELATIONS_STATE_VALID;
273 : : }
274 : :
42 275 [ + + ]: 7856 : if (has_pending_subtables)
276 : 239 : *has_pending_subtables = has_subtables;
277 : :
278 [ + + ]: 7856 : if (has_pending_subsequences)
279 : 3805 : *has_pending_subsequences = has_subsequences_non_ready;
62 280 : 7856 : }
|