LCOV - differential code coverage report
Current view: top level - src/backend/commands - repack_worker.c (source / functions) Coverage Total Hit UNC GNC
Current: bed3ffbf9d952be6c7d739d068cdce44c046dfb7 vs 574581b50ac9c63dd9e4abebb731a3b67e5b50f6 Lines: 94.5 % 165 156 9 156
Current Date: 2026-05-05 10:23:31 +0900 Functions: 100.0 % 8 8 8
Baseline: lcov-20260505-025707-baseline Branches: 62.2 % 74 46 28 46
Baseline Date: 2026-05-05 10:27:06 +0900 Line coverage date bins:
Legend: Lines:     hit not hit
Branches: + taken - not taken # not executed
(7,30] days: 94.5 % 165 156 9 156
Function coverage date bins:
(7,30] days: 100.0 % 8 8 8
Branch coverage date bins:
(7,30] days: 62.2 % 74 46 28 46

 Age         Owner                    Branch data    TLA  Line data    Source code
                                  1                 :                : /*-------------------------------------------------------------------------
                                  2                 :                :  *
                                  3                 :                :  * repack_worker.c
                                  4                 :                :  *    Implementation of the background worker for ad-hoc logical decoding
                                  5                 :                :  *    during REPACK (CONCURRENTLY).
                                  6                 :                :  *
                                  7                 :                :  *
                                  8                 :                :  * Copyright (c) 2026, PostgreSQL Global Development Group
                                  9                 :                :  *
                                 10                 :                :  *
                                 11                 :                :  * IDENTIFICATION
                                 12                 :                :  *    src/backend/commands/repack_worker.c
                                 13                 :                :  *
                                 14                 :                :  *-------------------------------------------------------------------------
                                 15                 :                :  */
                                 16                 :                : #include "postgres.h"
                                 17                 :                : 
                                 18                 :                : #include "access/table.h"
                                 19                 :                : #include "access/xlog_internal.h"
                                 20                 :                : #include "access/xlogutils.h"
                                 21                 :                : #include "access/xlogwait.h"
                                 22                 :                : #include "commands/repack.h"
                                 23                 :                : #include "commands/repack_internal.h"
                                 24                 :                : #include "libpq/pqmq.h"
                                 25                 :                : #include "replication/snapbuild.h"
                                 26                 :                : #include "storage/ipc.h"
                                 27                 :                : #include "storage/proc.h"
                                 28                 :                : #include "tcop/tcopprot.h"
                                 29                 :                : #include "utils/memutils.h"
                                 30                 :                : 
                                 31                 :                : #define REPL_PLUGIN_NAME   "pgrepack"
                                 32                 :                : 
                                 33                 :                : static void RepackWorkerShutdown(int code, Datum arg);
                                 34                 :                : static LogicalDecodingContext *repack_setup_logical_decoding(Oid relid);
                                 35                 :                : static void repack_cleanup_logical_decoding(LogicalDecodingContext *ctx);
                                 36                 :                : static void export_initial_snapshot(Snapshot snapshot,
                                 37                 :                :                                     DecodingWorkerShared *shared);
                                 38                 :                : static bool decode_concurrent_changes(LogicalDecodingContext *ctx,
                                 39                 :                :                                       DecodingWorkerShared *shared);
                                 40                 :                : 
                                 41                 :                : /* Is this process a REPACK worker? */
                                 42                 :                : static bool am_repack_worker = false;
                                 43                 :                : 
                                 44                 :                : /* The WAL segment being decoded. */
                                 45                 :                : static XLogSegNo repack_current_segment = 0;
                                 46                 :                : 
                                 47                 :                : /* Our DSM segment, for shutting down */
                                 48                 :                : static dsm_segment *worker_dsm_segment = NULL;
                                 49                 :                : 
                                 50                 :                : /*
                                 51                 :                :  * Keep track of the table we're processing, to skip logical decoding of data
                                 52                 :                :  * from other relations.
                                 53                 :                :  */
                                 54                 :                : static RelFileLocator repacked_rel_locator = {.relNumber = InvalidOid};
                                 55                 :                : static RelFileLocator repacked_rel_toast_locator = {.relNumber = InvalidOid};
                                 56                 :                : 
                                 57                 :                : 
                                 58                 :                : /* REPACK decoding worker entry point */
                                 59                 :                : void
   29 alvherre@kurilemu.de       60                 :GNC           3 : RepackWorkerMain(Datum main_arg)
                                 61                 :                : {
                                 62                 :                :     dsm_segment *seg;
                                 63                 :                :     DecodingWorkerShared *shared;
                                 64                 :                :     shm_mq     *mq;
                                 65                 :                :     shm_mq_handle *mqh;
                                 66                 :                :     LogicalDecodingContext *decoding_ctx;
                                 67                 :                :     SharedFileSet *sfs;
                                 68                 :                :     Snapshot    snapshot;
                                 69                 :                : 
                                 70                 :              3 :     am_repack_worker = true;
                                 71                 :                : 
                                 72                 :                :     /*
                                 73                 :                :      * Override the default bgworker_die() with die() so we can use
                                 74                 :                :      * CHECK_FOR_INTERRUPTS().
                                 75                 :                :      */
                                 76                 :              3 :     pqsignal(SIGTERM, die);
                                 77                 :              3 :     BackgroundWorkerUnblockSignals();
                                 78                 :                : 
                                 79                 :              3 :     seg = dsm_attach(DatumGetUInt32(main_arg));
                                 80         [ -  + ]:              3 :     if (seg == NULL)
   29 alvherre@kurilemu.de       81         [ #  # ]:UNC           0 :         ereport(ERROR,
                                 82                 :                :                 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                                 83                 :                :                 errmsg("could not map dynamic shared memory segment"));
   19 alvherre@kurilemu.de       84                 :GNC           3 :     worker_dsm_segment = seg;
                                 85                 :                : 
   29                            86                 :              3 :     shared = (DecodingWorkerShared *) dsm_segment_address(seg);
                                 87                 :                : 
                                 88                 :                :     /* Arrange to signal the leader if we exit. */
                                 89                 :              3 :     before_shmem_exit(RepackWorkerShutdown, PointerGetDatum(shared));
                                 90                 :                : 
                                 91                 :                :     /*
                                 92                 :                :      * Join locking group - see the comments around the call of
                                 93                 :                :      * start_repack_decoding_worker().
                                 94                 :                :      */
                                 95         [ -  + ]:              3 :     if (!BecomeLockGroupMember(shared->backend_proc, shared->backend_pid))
   29 alvherre@kurilemu.de       96                 :UNC           0 :         return;                 /* The leader is not running anymore. */
                                 97                 :                : 
                                 98                 :                :     /*
                                 99                 :                :      * Setup a queue to send error messages to the backend that launched this
                                100                 :                :      * worker.
                                101                 :                :      */
   29 alvherre@kurilemu.de      102                 :GNC           3 :     mq = (shm_mq *) (char *) BUFFERALIGN(shared->error_queue);
                                103                 :              3 :     shm_mq_set_sender(mq, MyProc);
                                104                 :              3 :     mqh = shm_mq_attach(mq, seg, NULL);
                                105                 :              3 :     pq_redirect_to_shm_mq(seg, mqh);
                                106                 :              3 :     pq_set_parallel_leader(shared->backend_pid,
                                107                 :                :                            shared->backend_proc_number);
                                108                 :                : 
                                109                 :                :     /* Connect to the database. LOGIN is not required. */
   15                           110                 :              3 :     BackgroundWorkerInitializeConnectionByOid(shared->dbid, shared->roleid,
                                111                 :                :                                               BGWORKER_BYPASS_ROLELOGINCHECK);
                                112                 :                : 
                                113                 :                :     /*
                                114                 :                :      * Transaction is needed to open relation, and it also provides us with a
                                115                 :                :      * resource owner.
                                116                 :                :      */
   29                           117                 :              3 :     StartTransactionCommand();
                                118                 :                : 
                                119                 :              3 :     shared = (DecodingWorkerShared *) dsm_segment_address(seg);
                                120                 :                : 
                                121                 :                :     /*
                                122                 :                :      * Not sure the spinlock is needed here - the backend should not change
                                123                 :                :      * anything in the shared memory until we have serialized the snapshot.
                                124                 :                :      */
                                125                 :              3 :     SpinLockAcquire(&shared->mutex);
                                126         [ -  + ]:              3 :     Assert(!XLogRecPtrIsValid(shared->lsn_upto));
                                127                 :              3 :     sfs = &shared->sfs;
                                128                 :              3 :     SpinLockRelease(&shared->mutex);
                                129                 :                : 
                                130                 :              3 :     SharedFileSetAttach(sfs, seg);
                                131                 :                : 
                                132                 :                :     /*
                                133                 :                :      * Prepare to capture the concurrent data changes ourselves.
                                134                 :                :      */
                                135                 :              3 :     decoding_ctx = repack_setup_logical_decoding(shared->relid);
                                136                 :                : 
                                137                 :                :     /* Announce that we're ready. */
                                138                 :              3 :     SpinLockAcquire(&shared->mutex);
                                139                 :              3 :     shared->initialized = true;
                                140                 :              3 :     SpinLockRelease(&shared->mutex);
                                141                 :              3 :     ConditionVariableSignal(&shared->cv);
                                142                 :                : 
                                143                 :                :     /* There doesn't seem to a nice API to set these */
                                144                 :              3 :     XactIsoLevel = XACT_REPEATABLE_READ;
                                145                 :              3 :     XactReadOnly = true;
                                146                 :                : 
                                147                 :                :     /* Build the initial snapshot and export it. */
                                148                 :              3 :     snapshot = SnapBuildInitialSnapshot(decoding_ctx->snapshot_builder);
                                149                 :              3 :     export_initial_snapshot(snapshot, shared);
                                150                 :                : 
                                151                 :                :     /*
                                152                 :                :      * Only historic snapshots should be used now. Do not let us restrict the
                                153                 :                :      * progress of xmin horizon.
                                154                 :                :      */
                                155                 :              3 :     InvalidateCatalogSnapshot();
                                156                 :                : 
                                157                 :                :     for (;;)
                                158                 :              3 :     {
                                159                 :              6 :         bool        stop = decode_concurrent_changes(decoding_ctx, shared);
                                160                 :                : 
                                161         [ +  + ]:              6 :         if (stop)
                                162                 :              3 :             break;
                                163                 :                : 
                                164                 :                :     }
                                165                 :                : 
                                166                 :                :     /* Cleanup. */
                                167                 :              3 :     repack_cleanup_logical_decoding(decoding_ctx);
                                168                 :              3 :     CommitTransactionCommand();
                                169                 :                : }
                                170                 :                : 
                                171                 :                : /*
                                172                 :                :  * See ParallelWorkerShutdown for details.
                                173                 :                :  */
                                174                 :                : static void
                                175                 :              3 : RepackWorkerShutdown(int code, Datum arg)
                                176                 :                : {
                                177                 :              3 :     DecodingWorkerShared *shared = (DecodingWorkerShared *) DatumGetPointer(arg);
                                178                 :                : 
                                179                 :              3 :     SendProcSignal(shared->backend_pid,
                                180                 :                :                    PROCSIG_REPACK_MESSAGE,
                                181                 :                :                    shared->backend_proc_number);
                                182                 :                : 
   19                           183                 :              3 :     dsm_detach(worker_dsm_segment);
   29                           184                 :              3 : }
                                185                 :                : 
                                186                 :                : bool
                                187                 :           2019 : AmRepackWorker(void)
                                188                 :                : {
                                189                 :           2019 :     return am_repack_worker;
                                190                 :                : }
                                191                 :                : 
                                192                 :                : /*
                                193                 :                :  * This function is much like pg_create_logical_replication_slot() except that
                                194                 :                :  * the new slot is neither released (if anyone else could read changes from
                                195                 :                :  * our slot, we could miss changes other backends do while we copy the
                                196                 :                :  * existing data into temporary table), nor persisted (it's easier to handle
                                197                 :                :  * crash by restarting all the work from scratch).
                                198                 :                :  */
                                199                 :                : static LogicalDecodingContext *
                                200                 :              3 : repack_setup_logical_decoding(Oid relid)
                                201                 :                : {
                                202                 :                :     Relation    rel;
                                203                 :                :     Oid         toastrelid;
                                204                 :                :     LogicalDecodingContext *ctx;
                                205                 :                :     NameData    slotname;
                                206                 :                :     RepackDecodingState *dstate;
                                207                 :                :     MemoryContext oldcxt;
                                208                 :                : 
                                209                 :                :     /*
                                210                 :                :      * REPACK CONCURRENTLY is not allowed in a transaction block, so this
                                211                 :                :      * should never fire.
                                212                 :                :      */
                                213         [ -  + ]:              3 :     Assert(!TransactionIdIsValid(GetTopTransactionIdIfAny()));
                                214                 :                : 
                                215                 :                :     /*
                                216                 :                :      * Make sure we can use logical decoding.
                                217                 :                :      */
   28                           218                 :              3 :     CheckLogicalDecodingRequirements(true);
                                219                 :                : 
                                220                 :                :     /*
                                221                 :                :      * A single backend should not execute multiple REPACK commands at a time,
                                222                 :                :      * so use PID to make the slot unique.
                                223                 :                :      *
                                224                 :                :      * RS_TEMPORARY so that the slot gets cleaned up on ERROR.
                                225                 :                :      */
   29                           226                 :              3 :     snprintf(NameStr(slotname), NAMEDATALEN, "repack_%d", MyProcPid);
   28                           227                 :              3 :     ReplicationSlotCreate(NameStr(slotname), true, RS_TEMPORARY, false, true,
                                228                 :                :                           false, false);
                                229                 :                : 
   29                           230                 :              3 :     EnsureLogicalDecodingEnabled();
                                231                 :                : 
                                232                 :                :     /*
                                233                 :                :      * Neither prepare_write nor do_write callback nor update_progress is
                                234                 :                :      * useful for us.
                                235                 :                :      */
                                236                 :              3 :     ctx = CreateInitDecodingContext(REPL_PLUGIN_NAME,
                                237                 :                :                                     NIL,
                                238                 :                :                                     true,
                                239                 :                :                                     true,
                                240                 :                :                                     InvalidXLogRecPtr,
                                241                 :              3 :                                     XL_ROUTINE(.page_read = read_local_xlog_page,
                                242                 :                :                                                .segment_open = wal_segment_open,
                                243                 :                :                                                .segment_close = wal_segment_close),
                                244                 :                :                                     NULL, NULL, NULL);
                                245                 :                : 
                                246                 :                :     /*
                                247                 :                :      * We don't have control on setting fast_forward, so at least check it.
                                248                 :                :      */
                                249         [ -  + ]:              3 :     Assert(!ctx->fast_forward);
                                250                 :                : 
                                251                 :                :     /* Avoid logical decoding of other relations. */
                                252                 :              3 :     rel = table_open(relid, AccessShareLock);
                                253                 :              3 :     repacked_rel_locator = rel->rd_locator;
                                254                 :              3 :     toastrelid = rel->rd_rel->reltoastrelid;
                                255         [ +  + ]:              3 :     if (OidIsValid(toastrelid))
                                256                 :                :     {
                                257                 :                :         Relation    toastrel;
                                258                 :                : 
                                259                 :                :         /* Avoid logical decoding of other TOAST relations. */
                                260                 :              1 :         toastrel = table_open(toastrelid, AccessShareLock);
                                261                 :              1 :         repacked_rel_toast_locator = toastrel->rd_locator;
                                262                 :              1 :         table_close(toastrel, AccessShareLock);
                                263                 :                :     }
                                264                 :              3 :     table_close(rel, AccessShareLock);
                                265                 :                : 
                                266                 :              3 :     DecodingContextFindStartpoint(ctx);
                                267                 :                : 
                                268                 :                :     /*
                                269                 :                :      * decode_concurrent_changes() needs non-blocking callback.
                                270                 :                :      */
                                271                 :              3 :     ctx->reader->routine.page_read = read_local_xlog_page_no_wait;
                                272                 :                : 
                                273                 :                :     /* Some WAL records should have been read. */
   19 fujii@postgresql.org      274         [ -  + ]:              3 :     Assert(XLogRecPtrIsValid(ctx->reader->EndRecPtr));
                                275                 :                : 
                                276                 :                :     /*
                                277                 :                :      * Initialize repack_current_segment so that we can notice WAL segment
                                278                 :                :      * boundaries.
                                279                 :                :      */
   29 alvherre@kurilemu.de      280                 :              3 :     XLByteToSeg(ctx->reader->EndRecPtr, repack_current_segment,
                                281                 :                :                 wal_segment_size);
                                282                 :                : 
                                283                 :                :     /* Our private state belongs to the decoding context. */
                                284                 :              3 :     oldcxt = MemoryContextSwitchTo(ctx->context);
                                285                 :                : 
                                286                 :                :     /*
                                287                 :                :      * read_local_xlog_page_no_wait() needs to be able to indicate the end of
                                288                 :                :      * WAL.
                                289                 :                :      */
                                290                 :              3 :     ctx->reader->private_data = palloc0_object(ReadLocalXLogPageNoWaitPrivate);
                                291                 :              3 :     dstate = palloc0_object(RepackDecodingState);
                                292                 :              3 :     MemoryContextSwitchTo(oldcxt);
                                293                 :                : 
                                294                 :                : #ifdef  USE_ASSERT_CHECKING
                                295                 :              3 :     dstate->relid = relid;
                                296                 :                : #endif
                                297                 :                : 
                                298                 :              3 :     dstate->change_cxt = AllocSetContextCreate(ctx->context,
                                299                 :                :                                                "REPACK - change",
                                300                 :                :                                                ALLOCSET_DEFAULT_SIZES);
                                301                 :                : 
                                302                 :                :     /* The file will be set as soon as we have it opened. */
                                303                 :              3 :     dstate->file = NULL;
                                304                 :                : 
                                305                 :                :     /*
                                306                 :                :      * Memory context and resource owner for long-lived resources.
                                307                 :                :      */
                                308                 :              3 :     dstate->worker_cxt = CurrentMemoryContext;
                                309                 :              3 :     dstate->worker_resowner = CurrentResourceOwner;
                                310                 :                : 
                                311                 :              3 :     ctx->output_writer_private = dstate;
                                312                 :                : 
                                313                 :              3 :     return ctx;
                                314                 :                : }
                                315                 :                : 
                                316                 :                : static void
                                317                 :              3 : repack_cleanup_logical_decoding(LogicalDecodingContext *ctx)
                                318                 :                : {
                                319                 :                :     RepackDecodingState *dstate;
                                320                 :                : 
                                321                 :              3 :     dstate = (RepackDecodingState *) ctx->output_writer_private;
                                322         [ +  + ]:              3 :     if (dstate->slot)
                                323                 :              1 :         ExecDropSingleTupleTableSlot(dstate->slot);
                                324                 :                : 
                                325                 :              3 :     FreeDecodingContext(ctx);
                                326                 :              3 :     ReplicationSlotDropAcquired();
                                327                 :              3 : }
                                328                 :                : 
                                329                 :                : /*
                                330                 :                :  * Make snapshot available to the backend that launched the decoding worker.
                                331                 :                :  */
                                332                 :                : static void
                                333                 :              3 : export_initial_snapshot(Snapshot snapshot, DecodingWorkerShared *shared)
                                334                 :                : {
                                335                 :                :     char        fname[MAXPGPATH];
                                336                 :                :     BufFile    *file;
                                337                 :                :     Size        snap_size;
                                338                 :                :     char       *snap_space;
                                339                 :                : 
                                340                 :              3 :     snap_size = EstimateSnapshotSpace(snapshot);
                                341                 :              3 :     snap_space = (char *) palloc(snap_size);
                                342                 :              3 :     SerializeSnapshot(snapshot, snap_space);
                                343                 :                : 
                                344                 :              3 :     DecodingWorkerFileName(fname, shared->relid, shared->last_exported + 1);
                                345                 :              3 :     file = BufFileCreateFileSet(&shared->sfs.fs, fname);
                                346                 :                :     /* To make restoration easier, write the snapshot size first. */
                                347                 :              3 :     BufFileWrite(file, &snap_size, sizeof(snap_size));
                                348                 :              3 :     BufFileWrite(file, snap_space, snap_size);
                                349                 :              3 :     BufFileClose(file);
   28                           350                 :              3 :     pfree(snap_space);
                                351                 :                : 
                                352                 :                :     /* Increase the counter to tell the backend that the file is available. */
   29                           353                 :              3 :     SpinLockAcquire(&shared->mutex);
                                354                 :              3 :     shared->last_exported++;
                                355                 :              3 :     SpinLockRelease(&shared->mutex);
                                356                 :              3 :     ConditionVariableSignal(&shared->cv);
                                357                 :              3 : }
                                358                 :                : 
                                359                 :                : /*
                                360                 :                :  * Decode logical changes from the WAL sequence and store them to a file.
                                361                 :                :  *
                                362                 :                :  * If true is returned, there is no more work for the worker.
                                363                 :                :  */
                                364                 :                : static bool
                                365                 :              6 : decode_concurrent_changes(LogicalDecodingContext *ctx,
                                366                 :                :                           DecodingWorkerShared *shared)
                                367                 :                : {
                                368                 :                :     RepackDecodingState *dstate;
                                369                 :                :     XLogRecPtr  lsn_upto;
                                370                 :                :     bool        done;
                                371                 :                :     char        fname[MAXPGPATH];
                                372                 :                : 
                                373                 :              6 :     dstate = (RepackDecodingState *) ctx->output_writer_private;
                                374                 :                : 
                                375                 :                :     /* Open the output file. */
                                376                 :              6 :     DecodingWorkerFileName(fname, shared->relid, shared->last_exported + 1);
                                377                 :              6 :     dstate->file = BufFileCreateFileSet(&shared->sfs.fs, fname);
                                378                 :                : 
                                379                 :              6 :     SpinLockAcquire(&shared->mutex);
                                380                 :              6 :     lsn_upto = shared->lsn_upto;
                                381                 :              6 :     done = shared->done;
                                382                 :              6 :     SpinLockRelease(&shared->mutex);
                                383                 :                : 
                                384                 :                :     while (true)
                                385                 :            872 :     {
                                386                 :                :         XLogRecord *record;
                                387                 :                :         XLogSegNo   segno_new;
                                388                 :            878 :         char       *errm = NULL;
                                389                 :                :         XLogRecPtr  end_lsn;
                                390                 :                : 
                                391         [ -  + ]:            878 :         CHECK_FOR_INTERRUPTS();
                                392                 :                : 
                                393                 :            878 :         record = XLogReadRecord(ctx->reader, &errm);
                                394         [ +  + ]:            878 :         if (record)
                                395                 :                :         {
                                396                 :            863 :             LogicalDecodingProcessRecord(ctx, ctx->reader);
                                397                 :                : 
                                398                 :                :             /*
                                399                 :                :              * If WAL segment boundary has been crossed, inform the decoding
                                400                 :                :              * system that the catalog_xmin can advance.
                                401                 :                :              */
                                402                 :            863 :             end_lsn = ctx->reader->EndRecPtr;
                                403                 :            863 :             XLByteToSeg(end_lsn, segno_new, wal_segment_size);
                                404         [ -  + ]:            863 :             if (segno_new != repack_current_segment)
                                405                 :                :             {
   29 alvherre@kurilemu.de      406                 :UNC           0 :                 LogicalConfirmReceivedLocation(end_lsn);
                                407         [ #  # ]:              0 :                 elog(DEBUG1, "REPACK: confirmed receive location %X/%X",
                                408                 :                :                      (uint32) (end_lsn >> 32), (uint32) end_lsn);
                                409                 :              0 :                 repack_current_segment = segno_new;
                                410                 :                :             }
                                411                 :                :         }
                                412                 :                :         else
                                413                 :                :         {
                                414                 :                :             ReadLocalXLogPageNoWaitPrivate *priv;
                                415                 :                : 
   29 alvherre@kurilemu.de      416         [ -  + ]:GNC          15 :             if (errm)
   29 alvherre@kurilemu.de      417         [ #  # ]:UNC           0 :                 ereport(ERROR,
                                418                 :                :                         errmsg("%s", errm));
                                419                 :                : 
                                420                 :                :             /*
                                421                 :                :              * In the decoding loop we do not want to get blocked when there
                                422                 :                :              * is no more WAL available, otherwise the loop would become
                                423                 :                :              * uninterruptible.
                                424                 :                :              */
   29 alvherre@kurilemu.de      425                 :GNC          15 :             priv = (ReadLocalXLogPageNoWaitPrivate *) ctx->reader->private_data;
                                426         [ +  - ]:             15 :             if (priv->end_of_wal)
                                427                 :                :                 /* Do not miss the end of WAL condition next time. */
                                428                 :             15 :                 priv->end_of_wal = false;
                                429                 :                :             else
   29 alvherre@kurilemu.de      430         [ #  # ]:UNC           0 :                 ereport(ERROR,
                                431                 :                :                         errmsg("could not read WAL record"));
                                432                 :                :         }
                                433                 :                : 
                                434                 :                :         /*
                                435                 :                :          * Whether we could read new record or not, keep checking if
                                436                 :                :          * 'lsn_upto' was specified.
                                437                 :                :          */
   29 alvherre@kurilemu.de      438         [ +  + ]:GNC         878 :         if (!XLogRecPtrIsValid(lsn_upto))
                                439                 :                :         {
                                440                 :            633 :             SpinLockAcquire(&shared->mutex);
                                441                 :            633 :             lsn_upto = shared->lsn_upto;
                                442                 :                :             /* 'done' should be set at the same time as 'lsn_upto' */
                                443                 :            633 :             done = shared->done;
                                444                 :            633 :             SpinLockRelease(&shared->mutex);
                                445                 :                :         }
                                446         [ +  + ]:            878 :         if (XLogRecPtrIsValid(lsn_upto) &&
                                447         [ +  + ]:            251 :             ctx->reader->EndRecPtr >= lsn_upto)
                                448                 :              6 :             break;
                                449                 :                : 
                                450         [ +  + ]:            872 :         if (record == NULL)
                                451                 :                :         {
                                452                 :             13 :             int64       timeout = 0;
                                453                 :                :             WaitLSNResult res;
                                454                 :                : 
                                455                 :                :             /*
                                456                 :                :              * Before we retry reading, wait until new WAL is flushed.
                                457                 :                :              *
                                458                 :                :              * There is a race condition such that the backend executing
                                459                 :                :              * REPACK determines 'lsn_upto', but before it sets the shared
                                460                 :                :              * variable, we reach the end of WAL. In that case we'd need to
                                461                 :                :              * wait until the next WAL flush (unrelated to REPACK). Although
                                462                 :                :              * that should not be a problem in a busy system, it might be
                                463                 :                :              * noticeable in other cases, including regression tests (which
                                464                 :                :              * are not necessarily executed in parallel). Therefore it makes
                                465                 :                :              * sense to use timeout.
                                466                 :                :              *
                                467                 :                :              * If lsn_upto is valid, WAL records having LSN lower than that
                                468                 :                :              * should already have been flushed to disk.
                                469                 :                :              */
                                470         [ +  - ]:             13 :             if (!XLogRecPtrIsValid(lsn_upto))
                                471                 :             13 :                 timeout = 100L;
                                472                 :             13 :             res = WaitForLSN(WAIT_LSN_TYPE_PRIMARY_FLUSH,
                                473                 :             13 :                              ctx->reader->EndRecPtr + 1,
                                474                 :                :                              timeout);
                                475   [ +  +  -  + ]:             13 :             if (res != WAIT_LSN_RESULT_SUCCESS &&
                                476                 :                :                 res != WAIT_LSN_RESULT_TIMEOUT)
   29 alvherre@kurilemu.de      477         [ #  # ]:UNC           0 :                 ereport(ERROR,
                                478                 :                :                         errmsg("waiting for WAL failed"));
                                479                 :                :         }
                                480                 :                :     }
                                481                 :                : 
                                482                 :                :     /*
                                483                 :                :      * Close the file so we can make it available to the backend.
                                484                 :                :      */
   29 alvherre@kurilemu.de      485                 :GNC           6 :     BufFileClose(dstate->file);
                                486                 :              6 :     dstate->file = NULL;
                                487                 :              6 :     SpinLockAcquire(&shared->mutex);
                                488                 :              6 :     shared->lsn_upto = InvalidXLogRecPtr;
                                489                 :              6 :     shared->last_exported++;
                                490                 :              6 :     SpinLockRelease(&shared->mutex);
                                491                 :              6 :     ConditionVariableSignal(&shared->cv);
                                492                 :                : 
                                493                 :              6 :     return done;
                                494                 :                : }
                                495                 :                : 
                                496                 :                : /*
                                497                 :                :  * Does the WAL record contain a data change that this backend does not need
                                498                 :                :  * to decode on behalf of REPACK (CONCURRENTLY)?
                                499                 :                :  */
                                500                 :                : bool
                                501                 :        1445965 : change_useless_for_repack(XLogRecordBuffer *buf)
                                502                 :                : {
                                503                 :        1445965 :     XLogReaderState *r = buf->record;
                                504                 :                :     RelFileLocator locator;
                                505                 :                : 
                                506                 :                :     /* TOAST locator should not be set unless the main is. */
                                507   [ +  +  -  + ]:        1445965 :     Assert(!OidIsValid(repacked_rel_toast_locator.relNumber) ||
                                508                 :                :            OidIsValid(repacked_rel_locator.relNumber));
                                509                 :                : 
                                510                 :                :     /*
                                511                 :                :      * Backends not involved in REPACK (CONCURRENTLY) should not do the
                                512                 :                :      * filtering.
                                513                 :                :      */
                                514         [ +  + ]:        1445965 :     if (!OidIsValid(repacked_rel_locator.relNumber))
                                515                 :        1445629 :         return false;
                                516                 :                : 
                                517                 :                :     /*
                                518                 :                :      * If the record does not contain the block 0, it's probably not INSERT /
                                519                 :                :      * UPDATE / DELETE. In any case, we do not have enough information to
                                520                 :                :      * filter the change out.
                                521                 :                :      */
                                522         [ -  + ]:            336 :     if (!XLogRecGetBlockTagExtended(r, 0, &locator, NULL, NULL, NULL))
   29 alvherre@kurilemu.de      523                 :UNC           0 :         return false;
                                524                 :                : 
                                525                 :                :     /*
                                526                 :                :      * Decode the change if it belongs to the table we are repacking, or if it
                                527                 :                :      * belongs to its TOAST relation.
                                528                 :                :      */
   29 alvherre@kurilemu.de      529   [ +  +  +  -  :GNC         336 :     if (RelFileLocatorEquals(locator, repacked_rel_locator))
                                              +  - ]
                                530                 :             31 :         return false;
                                531         [ +  + ]:            305 :     if (OidIsValid(repacked_rel_toast_locator.relNumber) &&
                                532   [ +  +  +  -  :            240 :         RelFileLocatorEquals(locator, repacked_rel_toast_locator))
                                              +  - ]
                                533                 :             44 :         return false;
                                534                 :                : 
                                535                 :                :     /* Filter out changes of other tables. */
                                536                 :            261 :     return true;
                                537                 :                : }
        

Generated by: LCOV version 2.5.0-beta