Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * worker_internal.h
4 : : * Internal headers shared by logical replication workers.
5 : : *
6 : : * Portions Copyright (c) 2016-2026, PostgreSQL Global Development Group
7 : : *
8 : : * src/include/replication/worker_internal.h
9 : : *
10 : : *-------------------------------------------------------------------------
11 : : */
12 : : #ifndef WORKER_INTERNAL_H
13 : : #define WORKER_INTERNAL_H
14 : :
15 : : #include "access/xlogdefs.h"
16 : : #include "catalog/pg_subscription.h"
17 : : #include "datatype/timestamp.h"
18 : : #include "miscadmin.h"
19 : : #include "replication/logicalrelation.h"
20 : : #include "replication/walreceiver.h"
21 : : #include "storage/buffile.h"
22 : : #include "storage/fileset.h"
23 : : #include "storage/shm_mq.h"
24 : : #include "storage/shm_toc.h"
25 : : #include "storage/spin.h"
26 : :
27 : : /* Different types of worker */
28 : : typedef enum LogicalRepWorkerType
29 : : {
30 : : WORKERTYPE_UNKNOWN = 0,
31 : : WORKERTYPE_TABLESYNC,
32 : : WORKERTYPE_SEQUENCESYNC,
33 : : WORKERTYPE_APPLY,
34 : : WORKERTYPE_PARALLEL_APPLY,
35 : : } LogicalRepWorkerType;
36 : :
37 : : typedef struct LogicalRepWorker
38 : : {
39 : : /* What type of worker is this? */
40 : : LogicalRepWorkerType type;
41 : :
42 : : /* Time at which this worker was launched. */
43 : : TimestampTz launch_time;
44 : :
45 : : /* Indicates if this slot is used or free. */
46 : : bool in_use;
47 : :
48 : : /* Increased every time the slot is taken by new worker. */
49 : : uint16 generation;
50 : :
51 : : /* Pointer to proc array. NULL if not running. */
52 : : PGPROC *proc;
53 : :
54 : : /* Database id to connect to. */
55 : : Oid dbid;
56 : :
57 : : /* User to use for connection (will be same as owner of subscription). */
58 : : Oid userid;
59 : :
60 : : /* Subscription id for the worker. */
61 : : Oid subid;
62 : :
63 : : /* Used for initial table synchronization. */
64 : : Oid relid;
65 : : char relstate;
66 : : XLogRecPtr relstate_lsn;
67 : : slock_t relmutex;
68 : :
69 : : /*
70 : : * Used to create the changes and subxact files for the streaming
71 : : * transactions. Upon the arrival of the first streaming transaction or
72 : : * when the first-time leader apply worker times out while sending changes
73 : : * to the parallel apply worker, the fileset will be initialized, and it
74 : : * will be deleted when the worker exits. Under this, separate buffiles
75 : : * would be created for each transaction which will be deleted after the
76 : : * transaction is finished.
77 : : */
78 : : FileSet *stream_fileset;
79 : :
80 : : /*
81 : : * PID of leader apply worker if this slot is used for a parallel apply
82 : : * worker, InvalidPid otherwise.
83 : : */
84 : : pid_t leader_pid;
85 : :
86 : : /* Indicates whether apply can be performed in parallel. */
87 : : bool parallel_apply;
88 : :
89 : : /*
90 : : * Changes made by this transaction and subsequent ones must be preserved.
91 : : * This ensures that update_deleted conflicts can be accurately detected
92 : : * during the apply phase of logical replication by this worker.
93 : : *
94 : : * The logical replication launcher manages an internal replication slot
95 : : * named "pg_conflict_detection". It asynchronously collects this ID to
96 : : * decide when to advance the xmin value of the slot.
97 : : *
98 : : * This ID is set to InvalidTransactionId when the apply worker stops
99 : : * retaining information needed for conflict detection.
100 : : */
101 : : TransactionId oldest_nonremovable_xid;
102 : :
103 : : /* Stats. */
104 : : XLogRecPtr last_lsn;
105 : : TimestampTz last_send_time;
106 : : TimestampTz last_recv_time;
107 : : XLogRecPtr reply_lsn;
108 : : TimestampTz reply_time;
109 : :
110 : : TimestampTz last_seqsync_start_time;
111 : : } LogicalRepWorker;
112 : :
113 : : /*
114 : : * State of the transaction in parallel apply worker.
115 : : *
116 : : * The enum values must have the same order as the transaction state
117 : : * transitions.
118 : : */
119 : : typedef enum ParallelTransState
120 : : {
121 : : PARALLEL_TRANS_UNKNOWN,
122 : : PARALLEL_TRANS_STARTED,
123 : : PARALLEL_TRANS_FINISHED,
124 : : } ParallelTransState;
125 : :
126 : : /*
127 : : * State of fileset used to communicate changes from leader to parallel
128 : : * apply worker.
129 : : *
130 : : * FS_EMPTY indicates an initial state where the leader doesn't need to use
131 : : * the file to communicate with the parallel apply worker.
132 : : *
133 : : * FS_SERIALIZE_IN_PROGRESS indicates that the leader is serializing changes
134 : : * to the file.
135 : : *
136 : : * FS_SERIALIZE_DONE indicates that the leader has serialized all changes to
137 : : * the file.
138 : : *
139 : : * FS_READY indicates that it is now ok for a parallel apply worker to
140 : : * read the file.
141 : : */
142 : : typedef enum PartialFileSetState
143 : : {
144 : : FS_EMPTY,
145 : : FS_SERIALIZE_IN_PROGRESS,
146 : : FS_SERIALIZE_DONE,
147 : : FS_READY,
148 : : } PartialFileSetState;
149 : :
150 : : /*
151 : : * Struct for sharing information between leader apply worker and parallel
152 : : * apply workers.
153 : : */
154 : : typedef struct ParallelApplyWorkerShared
155 : : {
156 : : slock_t mutex;
157 : :
158 : : TransactionId xid;
159 : :
160 : : /*
161 : : * State used to ensure commit ordering.
162 : : *
163 : : * The parallel apply worker will set it to PARALLEL_TRANS_FINISHED after
164 : : * handling the transaction finish commands while the apply leader will
165 : : * wait for it to become PARALLEL_TRANS_FINISHED before proceeding in
166 : : * transaction finish commands (e.g. STREAM_COMMIT/STREAM_PREPARE/
167 : : * STREAM_ABORT).
168 : : */
169 : : ParallelTransState xact_state;
170 : :
171 : : /* Information from the corresponding LogicalRepWorker slot. */
172 : : uint16 logicalrep_worker_generation;
173 : : int logicalrep_worker_slot_no;
174 : :
175 : : /*
176 : : * Indicates whether there are pending streaming blocks in the queue. The
177 : : * parallel apply worker will check it before starting to wait.
178 : : */
179 : : pg_atomic_uint32 pending_stream_count;
180 : :
181 : : /*
182 : : * XactLastCommitEnd from the parallel apply worker. This is required by
183 : : * the leader worker so it can update the lsn_mappings.
184 : : */
185 : : XLogRecPtr last_commit_end;
186 : :
187 : : /*
188 : : * After entering PARTIAL_SERIALIZE mode, the leader apply worker will
189 : : * serialize changes to the file, and share the fileset with the parallel
190 : : * apply worker when processing the transaction finish command. Then the
191 : : * parallel apply worker will apply all the spooled messages.
192 : : *
193 : : * FileSet is used here instead of SharedFileSet because we need it to
194 : : * survive after releasing the shared memory so that the leader apply
195 : : * worker can re-use the same fileset for the next streaming transaction.
196 : : */
197 : : PartialFileSetState fileset_state;
198 : : FileSet fileset;
199 : : } ParallelApplyWorkerShared;
200 : :
201 : : /*
202 : : * Information which is used to manage the parallel apply worker.
203 : : */
204 : : typedef struct ParallelApplyWorkerInfo
205 : : {
206 : : /*
207 : : * This queue is used to send changes from the leader apply worker to the
208 : : * parallel apply worker.
209 : : */
210 : : shm_mq_handle *mq_handle;
211 : :
212 : : /*
213 : : * This queue is used to transfer error messages from the parallel apply
214 : : * worker to the leader apply worker.
215 : : */
216 : : shm_mq_handle *error_mq_handle;
217 : :
218 : : dsm_segment *dsm_seg;
219 : :
220 : : /*
221 : : * Indicates whether the leader apply worker needs to serialize the
222 : : * remaining changes to a file due to timeout when attempting to send data
223 : : * to the parallel apply worker via shared memory.
224 : : */
225 : : bool serialize_changes;
226 : :
227 : : /*
228 : : * True if the worker is being used to process a parallel apply
229 : : * transaction. False indicates this worker is available for re-use.
230 : : */
231 : : bool in_use;
232 : :
233 : : ParallelApplyWorkerShared *shared;
234 : : } ParallelApplyWorkerInfo;
235 : :
236 : : /* Main memory context for apply worker. Permanent during worker lifetime. */
237 : : extern PGDLLIMPORT MemoryContext ApplyContext;
238 : :
239 : : extern PGDLLIMPORT MemoryContext ApplyMessageContext;
240 : :
241 : : extern PGDLLIMPORT ErrorContextCallback *apply_error_context_stack;
242 : :
243 : : extern PGDLLIMPORT ParallelApplyWorkerShared *MyParallelShared;
244 : :
245 : : /* libpqreceiver connection */
246 : : extern PGDLLIMPORT struct WalReceiverConn *LogRepWorkerWalRcvConn;
247 : :
248 : : /* Worker and subscription objects. */
249 : : extern PGDLLIMPORT Subscription *MySubscription;
250 : : extern PGDLLIMPORT LogicalRepWorker *MyLogicalRepWorker;
251 : :
252 : : extern PGDLLIMPORT bool in_remote_transaction;
253 : :
254 : : extern PGDLLIMPORT bool InitializingApplyWorker;
255 : :
256 : : extern PGDLLIMPORT List *table_states_not_ready;
257 : :
258 : : extern void logicalrep_worker_attach(int slot);
259 : : extern LogicalRepWorker *logicalrep_worker_find(LogicalRepWorkerType wtype,
260 : : Oid subid, Oid relid,
261 : : bool only_running);
262 : : extern List *logicalrep_workers_find(Oid subid, bool only_running,
263 : : bool acquire_lock);
264 : : extern bool logicalrep_worker_launch(LogicalRepWorkerType wtype,
265 : : Oid dbid, Oid subid, const char *subname,
266 : : Oid userid, Oid relid,
267 : : dsm_handle subworker_dsm,
268 : : bool retain_dead_tuples);
269 : : extern void logicalrep_worker_stop(LogicalRepWorkerType wtype, Oid subid,
270 : : Oid relid);
271 : : extern void logicalrep_pa_worker_stop(ParallelApplyWorkerInfo *winfo);
272 : : extern void logicalrep_worker_wakeup(LogicalRepWorkerType wtype, Oid subid,
273 : : Oid relid);
274 : : extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker);
275 : :
276 : : extern void logicalrep_reset_seqsync_start_time(void);
277 : : extern int logicalrep_sync_worker_count(Oid subid);
278 : :
279 : : extern void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid,
280 : : char *originname, Size szoriginname);
281 : :
282 : : extern bool AllTablesyncsReady(void);
283 : : extern bool HasSubscriptionTablesCached(void);
284 : : extern void UpdateTwoPhaseState(Oid suboid, char new_state);
285 : :
286 : : extern void ProcessSyncingTablesForSync(XLogRecPtr current_lsn);
287 : : extern void ProcessSyncingTablesForApply(XLogRecPtr current_lsn);
288 : : extern void ProcessSequencesForSync(void);
289 : :
290 : : pg_noreturn extern void FinishSyncWorker(void);
291 : : extern void InvalidateSyncingRelStates(Datum arg, SysCacheIdentifier cacheid,
292 : : uint32 hashvalue);
293 : : extern void launch_sync_worker(LogicalRepWorkerType wtype, int nsyncworkers,
294 : : Oid relid, TimestampTz *last_start_time);
295 : : extern void ProcessSyncingRelations(XLogRecPtr current_lsn);
296 : : extern void FetchRelationStates(bool *has_pending_subtables,
297 : : bool *has_pending_subsequences, bool *started_tx);
298 : :
299 : : extern void stream_start_internal(TransactionId xid, bool first_segment);
300 : : extern void stream_stop_internal(TransactionId xid);
301 : :
302 : : /* Common streaming function to apply all the spooled messages */
303 : : extern void apply_spooled_messages(FileSet *stream_fileset, TransactionId xid,
304 : : XLogRecPtr lsn);
305 : :
306 : : extern void apply_dispatch(StringInfo s);
307 : :
308 : : extern void maybe_reread_subscription(void);
309 : :
310 : : extern void stream_cleanup_files(Oid subid, TransactionId xid);
311 : :
312 : : extern void set_stream_options(WalRcvStreamOptions *options,
313 : : char *slotname,
314 : : XLogRecPtr *origin_startpos);
315 : :
316 : : extern void start_apply(XLogRecPtr origin_startpos);
317 : :
318 : : extern void InitializeLogRepWorker(void);
319 : :
320 : : extern void SetupApplyOrSyncWorker(int worker_slot);
321 : :
322 : : extern void DisableSubscriptionAndExit(void);
323 : :
324 : : extern void store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn);
325 : :
326 : : /* Function for apply error callback */
327 : : extern void apply_error_callback(void *arg);
328 : : extern void set_apply_error_context_origin(char *originname);
329 : :
330 : : /* Parallel apply worker setup and interactions */
331 : : extern void pa_allocate_worker(TransactionId xid);
332 : : extern ParallelApplyWorkerInfo *pa_find_worker(TransactionId xid);
333 : : extern void pa_detach_all_error_mq(void);
334 : :
335 : : extern bool pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes,
336 : : const void *data);
337 : : extern void pa_switch_to_partial_serialize(ParallelApplyWorkerInfo *winfo,
338 : : bool stream_locked);
339 : :
340 : : extern void pa_set_xact_state(ParallelApplyWorkerShared *wshared,
341 : : ParallelTransState xact_state);
342 : : extern void pa_set_stream_apply_worker(ParallelApplyWorkerInfo *winfo);
343 : :
344 : : extern void pa_start_subtrans(TransactionId current_xid,
345 : : TransactionId top_xid);
346 : : extern void pa_reset_subtrans(void);
347 : : extern void pa_stream_abort(LogicalRepStreamAbortData *abort_data);
348 : : extern void pa_set_fileset_state(ParallelApplyWorkerShared *wshared,
349 : : PartialFileSetState fileset_state);
350 : :
351 : : extern void pa_lock_stream(TransactionId xid, LOCKMODE lockmode);
352 : : extern void pa_unlock_stream(TransactionId xid, LOCKMODE lockmode);
353 : :
354 : : extern void pa_lock_transaction(TransactionId xid, LOCKMODE lockmode);
355 : : extern void pa_unlock_transaction(TransactionId xid, LOCKMODE lockmode);
356 : :
357 : : extern void pa_decr_and_wait_stream_block(void);
358 : :
359 : : extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo,
360 : : XLogRecPtr remote_lsn);
361 : :
362 : : #define isParallelApplyWorker(worker) ((worker)->in_use && \
363 : : (worker)->type == WORKERTYPE_PARALLEL_APPLY)
364 : : #define isTableSyncWorker(worker) ((worker)->in_use && \
365 : : (worker)->type == WORKERTYPE_TABLESYNC)
366 : : #define isSequenceSyncWorker(worker) ((worker)->in_use && \
367 : : (worker)->type == WORKERTYPE_SEQUENCESYNC)
368 : :
369 : : static inline bool
3330 peter_e@gmx.net 370 :CBC 1542 : am_tablesync_worker(void)
371 : : {
181 akapila@postgresql.o 372 [ + - + + ]:GNC 1542 : return isTableSyncWorker(MyLogicalRepWorker);
373 : : }
374 : :
375 : : static inline bool
376 : 1090 : am_sequencesync_worker(void)
377 : : {
378 [ + - + + ]: 1090 : return isSequenceSyncWorker(MyLogicalRepWorker);
379 : : }
380 : :
381 : : static inline bool
1212 akapila@postgresql.o 382 :CBC 229868 : am_leader_apply_worker(void)
383 : : {
984 384 [ - + ]: 229868 : Assert(MyLogicalRepWorker->in_use);
995 385 : 229868 : return (MyLogicalRepWorker->type == WORKERTYPE_APPLY);
386 : : }
387 : :
388 : : static inline bool
1212 389 : 347847 : am_parallel_apply_worker(void)
390 : : {
984 391 [ - + ]: 347847 : Assert(MyLogicalRepWorker->in_use);
1212 392 [ + - + + ]: 347847 : return isParallelApplyWorker(MyLogicalRepWorker);
393 : : }
394 : :
395 : : static inline LogicalRepWorkerType
74 akapila@postgresql.o 396 :GNC 113 : get_logical_worker_type(void)
397 : : {
398 [ - + ]: 113 : Assert(MyLogicalRepWorker->in_use);
399 : 113 : return MyLogicalRepWorker->type;
400 : : }
401 : :
402 : : #endif /* WORKER_INTERNAL_H */
|