Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * repack_worker.c
4 : : * Implementation of the background worker for ad-hoc logical decoding
5 : : * during REPACK (CONCURRENTLY).
6 : : *
7 : : *
8 : : * Copyright (c) 2026, PostgreSQL Global Development Group
9 : : *
10 : : *
11 : : * IDENTIFICATION
12 : : * src/backend/commands/repack_worker.c
13 : : *
14 : : *-------------------------------------------------------------------------
15 : : */
16 : : #include "postgres.h"
17 : :
18 : : #include "access/table.h"
19 : : #include "access/xlog_internal.h"
20 : : #include "access/xlogutils.h"
21 : : #include "access/xlogwait.h"
22 : : #include "commands/repack.h"
23 : : #include "commands/repack_internal.h"
24 : : #include "libpq/pqmq.h"
25 : : #include "replication/snapbuild.h"
26 : : #include "storage/ipc.h"
27 : : #include "storage/proc.h"
28 : : #include "tcop/tcopprot.h"
29 : : #include "utils/memutils.h"
30 : :
31 : : #define REPL_PLUGIN_NAME "pgrepack"
32 : :
33 : : static void RepackWorkerShutdown(int code, Datum arg);
34 : : static LogicalDecodingContext *repack_setup_logical_decoding(Oid relid);
35 : : static void repack_cleanup_logical_decoding(LogicalDecodingContext *ctx);
36 : : static void export_initial_snapshot(Snapshot snapshot,
37 : : DecodingWorkerShared *shared);
38 : : static bool decode_concurrent_changes(LogicalDecodingContext *ctx,
39 : : DecodingWorkerShared *shared);
40 : :
41 : : /* Is this process a REPACK worker? */
42 : : static bool am_repack_worker = false;
43 : :
44 : : /* The WAL segment being decoded. */
45 : : static XLogSegNo repack_current_segment = 0;
46 : :
47 : : /* Our DSM segment, for shutting down */
48 : : static dsm_segment *worker_dsm_segment = NULL;
49 : :
50 : : /*
51 : : * Keep track of the table we're processing, to skip logical decoding of data
52 : : * from other relations.
53 : : */
54 : : static RelFileLocator repacked_rel_locator = {.relNumber = InvalidOid};
55 : : static RelFileLocator repacked_rel_toast_locator = {.relNumber = InvalidOid};
56 : :
57 : :
58 : : /* REPACK decoding worker entry point */
59 : : void
29 alvherre@kurilemu.de 60 :GNC 3 : RepackWorkerMain(Datum main_arg)
61 : : {
62 : : dsm_segment *seg;
63 : : DecodingWorkerShared *shared;
64 : : shm_mq *mq;
65 : : shm_mq_handle *mqh;
66 : : LogicalDecodingContext *decoding_ctx;
67 : : SharedFileSet *sfs;
68 : : Snapshot snapshot;
69 : :
70 : 3 : am_repack_worker = true;
71 : :
72 : : /*
73 : : * Override the default bgworker_die() with die() so we can use
74 : : * CHECK_FOR_INTERRUPTS().
75 : : */
76 : 3 : pqsignal(SIGTERM, die);
77 : 3 : BackgroundWorkerUnblockSignals();
78 : :
79 : 3 : seg = dsm_attach(DatumGetUInt32(main_arg));
80 [ - + ]: 3 : if (seg == NULL)
29 alvherre@kurilemu.de 81 [ # # ]:UNC 0 : ereport(ERROR,
82 : : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
83 : : errmsg("could not map dynamic shared memory segment"));
19 alvherre@kurilemu.de 84 :GNC 3 : worker_dsm_segment = seg;
85 : :
29 86 : 3 : shared = (DecodingWorkerShared *) dsm_segment_address(seg);
87 : :
88 : : /* Arrange to signal the leader if we exit. */
89 : 3 : before_shmem_exit(RepackWorkerShutdown, PointerGetDatum(shared));
90 : :
91 : : /*
92 : : * Join locking group - see the comments around the call of
93 : : * start_repack_decoding_worker().
94 : : */
95 [ - + ]: 3 : if (!BecomeLockGroupMember(shared->backend_proc, shared->backend_pid))
29 alvherre@kurilemu.de 96 :UNC 0 : return; /* The leader is not running anymore. */
97 : :
98 : : /*
99 : : * Setup a queue to send error messages to the backend that launched this
100 : : * worker.
101 : : */
29 alvherre@kurilemu.de 102 :GNC 3 : mq = (shm_mq *) (char *) BUFFERALIGN(shared->error_queue);
103 : 3 : shm_mq_set_sender(mq, MyProc);
104 : 3 : mqh = shm_mq_attach(mq, seg, NULL);
105 : 3 : pq_redirect_to_shm_mq(seg, mqh);
106 : 3 : pq_set_parallel_leader(shared->backend_pid,
107 : : shared->backend_proc_number);
108 : :
109 : : /* Connect to the database. LOGIN is not required. */
15 110 : 3 : BackgroundWorkerInitializeConnectionByOid(shared->dbid, shared->roleid,
111 : : BGWORKER_BYPASS_ROLELOGINCHECK);
112 : :
113 : : /*
114 : : * Transaction is needed to open relation, and it also provides us with a
115 : : * resource owner.
116 : : */
29 117 : 3 : StartTransactionCommand();
118 : :
119 : 3 : shared = (DecodingWorkerShared *) dsm_segment_address(seg);
120 : :
121 : : /*
122 : : * Not sure the spinlock is needed here - the backend should not change
123 : : * anything in the shared memory until we have serialized the snapshot.
124 : : */
125 : 3 : SpinLockAcquire(&shared->mutex);
126 [ - + ]: 3 : Assert(!XLogRecPtrIsValid(shared->lsn_upto));
127 : 3 : sfs = &shared->sfs;
128 : 3 : SpinLockRelease(&shared->mutex);
129 : :
130 : 3 : SharedFileSetAttach(sfs, seg);
131 : :
132 : : /*
133 : : * Prepare to capture the concurrent data changes ourselves.
134 : : */
135 : 3 : decoding_ctx = repack_setup_logical_decoding(shared->relid);
136 : :
137 : : /* Announce that we're ready. */
138 : 3 : SpinLockAcquire(&shared->mutex);
139 : 3 : shared->initialized = true;
140 : 3 : SpinLockRelease(&shared->mutex);
141 : 3 : ConditionVariableSignal(&shared->cv);
142 : :
143 : : /* There doesn't seem to a nice API to set these */
144 : 3 : XactIsoLevel = XACT_REPEATABLE_READ;
145 : 3 : XactReadOnly = true;
146 : :
147 : : /* Build the initial snapshot and export it. */
148 : 3 : snapshot = SnapBuildInitialSnapshot(decoding_ctx->snapshot_builder);
149 : 3 : export_initial_snapshot(snapshot, shared);
150 : :
151 : : /*
152 : : * Only historic snapshots should be used now. Do not let us restrict the
153 : : * progress of xmin horizon.
154 : : */
155 : 3 : InvalidateCatalogSnapshot();
156 : :
157 : : for (;;)
158 : 3 : {
159 : 6 : bool stop = decode_concurrent_changes(decoding_ctx, shared);
160 : :
161 [ + + ]: 6 : if (stop)
162 : 3 : break;
163 : :
164 : : }
165 : :
166 : : /* Cleanup. */
167 : 3 : repack_cleanup_logical_decoding(decoding_ctx);
168 : 3 : CommitTransactionCommand();
169 : : }
170 : :
171 : : /*
172 : : * See ParallelWorkerShutdown for details.
173 : : */
174 : : static void
175 : 3 : RepackWorkerShutdown(int code, Datum arg)
176 : : {
177 : 3 : DecodingWorkerShared *shared = (DecodingWorkerShared *) DatumGetPointer(arg);
178 : :
179 : 3 : SendProcSignal(shared->backend_pid,
180 : : PROCSIG_REPACK_MESSAGE,
181 : : shared->backend_proc_number);
182 : :
19 183 : 3 : dsm_detach(worker_dsm_segment);
29 184 : 3 : }
185 : :
186 : : bool
187 : 2019 : AmRepackWorker(void)
188 : : {
189 : 2019 : return am_repack_worker;
190 : : }
191 : :
192 : : /*
193 : : * This function is much like pg_create_logical_replication_slot() except that
194 : : * the new slot is neither released (if anyone else could read changes from
195 : : * our slot, we could miss changes other backends do while we copy the
196 : : * existing data into temporary table), nor persisted (it's easier to handle
197 : : * crash by restarting all the work from scratch).
198 : : */
199 : : static LogicalDecodingContext *
200 : 3 : repack_setup_logical_decoding(Oid relid)
201 : : {
202 : : Relation rel;
203 : : Oid toastrelid;
204 : : LogicalDecodingContext *ctx;
205 : : NameData slotname;
206 : : RepackDecodingState *dstate;
207 : : MemoryContext oldcxt;
208 : :
209 : : /*
210 : : * REPACK CONCURRENTLY is not allowed in a transaction block, so this
211 : : * should never fire.
212 : : */
213 [ - + ]: 3 : Assert(!TransactionIdIsValid(GetTopTransactionIdIfAny()));
214 : :
215 : : /*
216 : : * Make sure we can use logical decoding.
217 : : */
28 218 : 3 : CheckLogicalDecodingRequirements(true);
219 : :
220 : : /*
221 : : * A single backend should not execute multiple REPACK commands at a time,
222 : : * so use PID to make the slot unique.
223 : : *
224 : : * RS_TEMPORARY so that the slot gets cleaned up on ERROR.
225 : : */
29 226 : 3 : snprintf(NameStr(slotname), NAMEDATALEN, "repack_%d", MyProcPid);
28 227 : 3 : ReplicationSlotCreate(NameStr(slotname), true, RS_TEMPORARY, false, true,
228 : : false, false);
229 : :
29 230 : 3 : EnsureLogicalDecodingEnabled();
231 : :
232 : : /*
233 : : * Neither prepare_write nor do_write callback nor update_progress is
234 : : * useful for us.
235 : : */
236 : 3 : ctx = CreateInitDecodingContext(REPL_PLUGIN_NAME,
237 : : NIL,
238 : : true,
239 : : true,
240 : : InvalidXLogRecPtr,
241 : 3 : XL_ROUTINE(.page_read = read_local_xlog_page,
242 : : .segment_open = wal_segment_open,
243 : : .segment_close = wal_segment_close),
244 : : NULL, NULL, NULL);
245 : :
246 : : /*
247 : : * We don't have control on setting fast_forward, so at least check it.
248 : : */
249 [ - + ]: 3 : Assert(!ctx->fast_forward);
250 : :
251 : : /* Avoid logical decoding of other relations. */
252 : 3 : rel = table_open(relid, AccessShareLock);
253 : 3 : repacked_rel_locator = rel->rd_locator;
254 : 3 : toastrelid = rel->rd_rel->reltoastrelid;
255 [ + + ]: 3 : if (OidIsValid(toastrelid))
256 : : {
257 : : Relation toastrel;
258 : :
259 : : /* Avoid logical decoding of other TOAST relations. */
260 : 1 : toastrel = table_open(toastrelid, AccessShareLock);
261 : 1 : repacked_rel_toast_locator = toastrel->rd_locator;
262 : 1 : table_close(toastrel, AccessShareLock);
263 : : }
264 : 3 : table_close(rel, AccessShareLock);
265 : :
266 : 3 : DecodingContextFindStartpoint(ctx);
267 : :
268 : : /*
269 : : * decode_concurrent_changes() needs non-blocking callback.
270 : : */
271 : 3 : ctx->reader->routine.page_read = read_local_xlog_page_no_wait;
272 : :
273 : : /* Some WAL records should have been read. */
19 fujii@postgresql.org 274 [ - + ]: 3 : Assert(XLogRecPtrIsValid(ctx->reader->EndRecPtr));
275 : :
276 : : /*
277 : : * Initialize repack_current_segment so that we can notice WAL segment
278 : : * boundaries.
279 : : */
29 alvherre@kurilemu.de 280 : 3 : XLByteToSeg(ctx->reader->EndRecPtr, repack_current_segment,
281 : : wal_segment_size);
282 : :
283 : : /* Our private state belongs to the decoding context. */
284 : 3 : oldcxt = MemoryContextSwitchTo(ctx->context);
285 : :
286 : : /*
287 : : * read_local_xlog_page_no_wait() needs to be able to indicate the end of
288 : : * WAL.
289 : : */
290 : 3 : ctx->reader->private_data = palloc0_object(ReadLocalXLogPageNoWaitPrivate);
291 : 3 : dstate = palloc0_object(RepackDecodingState);
292 : 3 : MemoryContextSwitchTo(oldcxt);
293 : :
294 : : #ifdef USE_ASSERT_CHECKING
295 : 3 : dstate->relid = relid;
296 : : #endif
297 : :
298 : 3 : dstate->change_cxt = AllocSetContextCreate(ctx->context,
299 : : "REPACK - change",
300 : : ALLOCSET_DEFAULT_SIZES);
301 : :
302 : : /* The file will be set as soon as we have it opened. */
303 : 3 : dstate->file = NULL;
304 : :
305 : : /*
306 : : * Memory context and resource owner for long-lived resources.
307 : : */
308 : 3 : dstate->worker_cxt = CurrentMemoryContext;
309 : 3 : dstate->worker_resowner = CurrentResourceOwner;
310 : :
311 : 3 : ctx->output_writer_private = dstate;
312 : :
313 : 3 : return ctx;
314 : : }
315 : :
316 : : static void
317 : 3 : repack_cleanup_logical_decoding(LogicalDecodingContext *ctx)
318 : : {
319 : : RepackDecodingState *dstate;
320 : :
321 : 3 : dstate = (RepackDecodingState *) ctx->output_writer_private;
322 [ + + ]: 3 : if (dstate->slot)
323 : 1 : ExecDropSingleTupleTableSlot(dstate->slot);
324 : :
325 : 3 : FreeDecodingContext(ctx);
326 : 3 : ReplicationSlotDropAcquired();
327 : 3 : }
328 : :
329 : : /*
330 : : * Make snapshot available to the backend that launched the decoding worker.
331 : : */
332 : : static void
333 : 3 : export_initial_snapshot(Snapshot snapshot, DecodingWorkerShared *shared)
334 : : {
335 : : char fname[MAXPGPATH];
336 : : BufFile *file;
337 : : Size snap_size;
338 : : char *snap_space;
339 : :
340 : 3 : snap_size = EstimateSnapshotSpace(snapshot);
341 : 3 : snap_space = (char *) palloc(snap_size);
342 : 3 : SerializeSnapshot(snapshot, snap_space);
343 : :
344 : 3 : DecodingWorkerFileName(fname, shared->relid, shared->last_exported + 1);
345 : 3 : file = BufFileCreateFileSet(&shared->sfs.fs, fname);
346 : : /* To make restoration easier, write the snapshot size first. */
347 : 3 : BufFileWrite(file, &snap_size, sizeof(snap_size));
348 : 3 : BufFileWrite(file, snap_space, snap_size);
349 : 3 : BufFileClose(file);
28 350 : 3 : pfree(snap_space);
351 : :
352 : : /* Increase the counter to tell the backend that the file is available. */
29 353 : 3 : SpinLockAcquire(&shared->mutex);
354 : 3 : shared->last_exported++;
355 : 3 : SpinLockRelease(&shared->mutex);
356 : 3 : ConditionVariableSignal(&shared->cv);
357 : 3 : }
358 : :
359 : : /*
360 : : * Decode logical changes from the WAL sequence and store them to a file.
361 : : *
362 : : * If true is returned, there is no more work for the worker.
363 : : */
364 : : static bool
365 : 6 : decode_concurrent_changes(LogicalDecodingContext *ctx,
366 : : DecodingWorkerShared *shared)
367 : : {
368 : : RepackDecodingState *dstate;
369 : : XLogRecPtr lsn_upto;
370 : : bool done;
371 : : char fname[MAXPGPATH];
372 : :
373 : 6 : dstate = (RepackDecodingState *) ctx->output_writer_private;
374 : :
375 : : /* Open the output file. */
376 : 6 : DecodingWorkerFileName(fname, shared->relid, shared->last_exported + 1);
377 : 6 : dstate->file = BufFileCreateFileSet(&shared->sfs.fs, fname);
378 : :
379 : 6 : SpinLockAcquire(&shared->mutex);
380 : 6 : lsn_upto = shared->lsn_upto;
381 : 6 : done = shared->done;
382 : 6 : SpinLockRelease(&shared->mutex);
383 : :
384 : : while (true)
385 : 872 : {
386 : : XLogRecord *record;
387 : : XLogSegNo segno_new;
388 : 878 : char *errm = NULL;
389 : : XLogRecPtr end_lsn;
390 : :
391 [ - + ]: 878 : CHECK_FOR_INTERRUPTS();
392 : :
393 : 878 : record = XLogReadRecord(ctx->reader, &errm);
394 [ + + ]: 878 : if (record)
395 : : {
396 : 863 : LogicalDecodingProcessRecord(ctx, ctx->reader);
397 : :
398 : : /*
399 : : * If WAL segment boundary has been crossed, inform the decoding
400 : : * system that the catalog_xmin can advance.
401 : : */
402 : 863 : end_lsn = ctx->reader->EndRecPtr;
403 : 863 : XLByteToSeg(end_lsn, segno_new, wal_segment_size);
404 [ - + ]: 863 : if (segno_new != repack_current_segment)
405 : : {
29 alvherre@kurilemu.de 406 :UNC 0 : LogicalConfirmReceivedLocation(end_lsn);
407 [ # # ]: 0 : elog(DEBUG1, "REPACK: confirmed receive location %X/%X",
408 : : (uint32) (end_lsn >> 32), (uint32) end_lsn);
409 : 0 : repack_current_segment = segno_new;
410 : : }
411 : : }
412 : : else
413 : : {
414 : : ReadLocalXLogPageNoWaitPrivate *priv;
415 : :
29 alvherre@kurilemu.de 416 [ - + ]:GNC 15 : if (errm)
29 alvherre@kurilemu.de 417 [ # # ]:UNC 0 : ereport(ERROR,
418 : : errmsg("%s", errm));
419 : :
420 : : /*
421 : : * In the decoding loop we do not want to get blocked when there
422 : : * is no more WAL available, otherwise the loop would become
423 : : * uninterruptible.
424 : : */
29 alvherre@kurilemu.de 425 :GNC 15 : priv = (ReadLocalXLogPageNoWaitPrivate *) ctx->reader->private_data;
426 [ + - ]: 15 : if (priv->end_of_wal)
427 : : /* Do not miss the end of WAL condition next time. */
428 : 15 : priv->end_of_wal = false;
429 : : else
29 alvherre@kurilemu.de 430 [ # # ]:UNC 0 : ereport(ERROR,
431 : : errmsg("could not read WAL record"));
432 : : }
433 : :
434 : : /*
435 : : * Whether we could read new record or not, keep checking if
436 : : * 'lsn_upto' was specified.
437 : : */
29 alvherre@kurilemu.de 438 [ + + ]:GNC 878 : if (!XLogRecPtrIsValid(lsn_upto))
439 : : {
440 : 633 : SpinLockAcquire(&shared->mutex);
441 : 633 : lsn_upto = shared->lsn_upto;
442 : : /* 'done' should be set at the same time as 'lsn_upto' */
443 : 633 : done = shared->done;
444 : 633 : SpinLockRelease(&shared->mutex);
445 : : }
446 [ + + ]: 878 : if (XLogRecPtrIsValid(lsn_upto) &&
447 [ + + ]: 251 : ctx->reader->EndRecPtr >= lsn_upto)
448 : 6 : break;
449 : :
450 [ + + ]: 872 : if (record == NULL)
451 : : {
452 : 13 : int64 timeout = 0;
453 : : WaitLSNResult res;
454 : :
455 : : /*
456 : : * Before we retry reading, wait until new WAL is flushed.
457 : : *
458 : : * There is a race condition such that the backend executing
459 : : * REPACK determines 'lsn_upto', but before it sets the shared
460 : : * variable, we reach the end of WAL. In that case we'd need to
461 : : * wait until the next WAL flush (unrelated to REPACK). Although
462 : : * that should not be a problem in a busy system, it might be
463 : : * noticeable in other cases, including regression tests (which
464 : : * are not necessarily executed in parallel). Therefore it makes
465 : : * sense to use timeout.
466 : : *
467 : : * If lsn_upto is valid, WAL records having LSN lower than that
468 : : * should already have been flushed to disk.
469 : : */
470 [ + - ]: 13 : if (!XLogRecPtrIsValid(lsn_upto))
471 : 13 : timeout = 100L;
472 : 13 : res = WaitForLSN(WAIT_LSN_TYPE_PRIMARY_FLUSH,
473 : 13 : ctx->reader->EndRecPtr + 1,
474 : : timeout);
475 [ + + - + ]: 13 : if (res != WAIT_LSN_RESULT_SUCCESS &&
476 : : res != WAIT_LSN_RESULT_TIMEOUT)
29 alvherre@kurilemu.de 477 [ # # ]:UNC 0 : ereport(ERROR,
478 : : errmsg("waiting for WAL failed"));
479 : : }
480 : : }
481 : :
482 : : /*
483 : : * Close the file so we can make it available to the backend.
484 : : */
29 alvherre@kurilemu.de 485 :GNC 6 : BufFileClose(dstate->file);
486 : 6 : dstate->file = NULL;
487 : 6 : SpinLockAcquire(&shared->mutex);
488 : 6 : shared->lsn_upto = InvalidXLogRecPtr;
489 : 6 : shared->last_exported++;
490 : 6 : SpinLockRelease(&shared->mutex);
491 : 6 : ConditionVariableSignal(&shared->cv);
492 : :
493 : 6 : return done;
494 : : }
495 : :
496 : : /*
497 : : * Does the WAL record contain a data change that this backend does not need
498 : : * to decode on behalf of REPACK (CONCURRENTLY)?
499 : : */
500 : : bool
501 : 1445965 : change_useless_for_repack(XLogRecordBuffer *buf)
502 : : {
503 : 1445965 : XLogReaderState *r = buf->record;
504 : : RelFileLocator locator;
505 : :
506 : : /* TOAST locator should not be set unless the main is. */
507 [ + + - + ]: 1445965 : Assert(!OidIsValid(repacked_rel_toast_locator.relNumber) ||
508 : : OidIsValid(repacked_rel_locator.relNumber));
509 : :
510 : : /*
511 : : * Backends not involved in REPACK (CONCURRENTLY) should not do the
512 : : * filtering.
513 : : */
514 [ + + ]: 1445965 : if (!OidIsValid(repacked_rel_locator.relNumber))
515 : 1445629 : return false;
516 : :
517 : : /*
518 : : * If the record does not contain the block 0, it's probably not INSERT /
519 : : * UPDATE / DELETE. In any case, we do not have enough information to
520 : : * filter the change out.
521 : : */
522 [ - + ]: 336 : if (!XLogRecGetBlockTagExtended(r, 0, &locator, NULL, NULL, NULL))
29 alvherre@kurilemu.de 523 :UNC 0 : return false;
524 : :
525 : : /*
526 : : * Decode the change if it belongs to the table we are repacking, or if it
527 : : * belongs to its TOAST relation.
528 : : */
29 alvherre@kurilemu.de 529 [ + + + - :GNC 336 : if (RelFileLocatorEquals(locator, repacked_rel_locator))
+ - ]
530 : 31 : return false;
531 [ + + ]: 305 : if (OidIsValid(repacked_rel_toast_locator.relNumber) &&
532 [ + + + - : 240 : RelFileLocatorEquals(locator, repacked_rel_toast_locator))
+ - ]
533 : 44 : return false;
534 : :
535 : : /* Filter out changes of other tables. */
536 : 261 : return true;
537 : : }
|