LCOV - differential code coverage report
Current view: top level - src/backend/commands - repack_worker.c (source / functions) Coverage Total Hit UNC GNC
Current: 380a8b2ea024c33a35e7abc8628e7c4f52f9f9f9 vs db5ed03217b9c238703df8b4b286115d6e940488 Lines: 93.9 % 165 155 10 155
Current Date: 2026-05-29 21:51:00 -0400 Functions: 100.0 % 8 8 8
Baseline: lcov-20260530-034037-baseline Branches: 62.2 % 74 46 28 46
Baseline Date: 2026-05-29 14:39:03 -0700 Line coverage date bins:
Legend: Lines:     hit not hit
Branches: + taken - not taken # not executed
[..1] days: 0.0 % 1 0 1
(1,7] days: 100.0 % 1 1 1
(30,360] days: 94.5 % 163 154 9 154
Function coverage date bins:
(30,360] days: 100.0 % 8 8 8
Branch coverage date bins:
(30,360] days: 62.2 % 74 46 28 46

 Age         Owner                    Branch data    TLA  Line data    Source code
                                  1                 :                : /*-------------------------------------------------------------------------
                                  2                 :                :  *
                                  3                 :                :  * repack_worker.c
                                  4                 :                :  *    Implementation of the background worker for ad-hoc logical decoding
                                  5                 :                :  *    during REPACK (CONCURRENTLY).
                                  6                 :                :  *
                                  7                 :                :  *
                                  8                 :                :  * Copyright (c) 2026, PostgreSQL Global Development Group
                                  9                 :                :  *
                                 10                 :                :  *
                                 11                 :                :  * IDENTIFICATION
                                 12                 :                :  *    src/backend/commands/repack_worker.c
                                 13                 :                :  *
                                 14                 :                :  *-------------------------------------------------------------------------
                                 15                 :                :  */
                                 16                 :                : #include "postgres.h"
                                 17                 :                : 
                                 18                 :                : #include "access/table.h"
                                 19                 :                : #include "access/xlog_internal.h"
                                 20                 :                : #include "access/xlogutils.h"
                                 21                 :                : #include "access/xlogwait.h"
                                 22                 :                : #include "commands/repack.h"
                                 23                 :                : #include "commands/repack_internal.h"
                                 24                 :                : #include "libpq/pqmq.h"
                                 25                 :                : #include "replication/snapbuild.h"
                                 26                 :                : #include "storage/ipc.h"
                                 27                 :                : #include "storage/proc.h"
                                 28                 :                : #include "tcop/tcopprot.h"
                                 29                 :                : #include "utils/memutils.h"
                                 30                 :                : 
                                 31                 :                : #define REPL_PLUGIN_NAME   "pgrepack"
                                 32                 :                : 
                                 33                 :                : static void RepackWorkerShutdown(int code, Datum arg);
                                 34                 :                : static LogicalDecodingContext *repack_setup_logical_decoding(Oid relid);
                                 35                 :                : static void repack_cleanup_logical_decoding(LogicalDecodingContext *ctx);
                                 36                 :                : static void export_initial_snapshot(Snapshot snapshot,
                                 37                 :                :                                     DecodingWorkerShared *shared);
                                 38                 :                : static bool decode_concurrent_changes(LogicalDecodingContext *ctx,
                                 39                 :                :                                       DecodingWorkerShared *shared);
                                 40                 :                : 
                                 41                 :                : /* Is this process a REPACK worker? */
                                 42                 :                : static bool am_repack_worker = false;
                                 43                 :                : 
                                 44                 :                : /* The WAL segment being decoded. */
                                 45                 :                : static XLogSegNo repack_current_segment = 0;
                                 46                 :                : 
                                 47                 :                : /* Our DSM segment, for shutting down */
                                 48                 :                : static dsm_segment *worker_dsm_segment = NULL;
                                 49                 :                : 
                                 50                 :                : /*
                                 51                 :                :  * Keep track of the table we're processing, to skip logical decoding of data
                                 52                 :                :  * from other relations.
                                 53                 :                :  */
                                 54                 :                : static RelFileLocator repacked_rel_locator = {.relNumber = InvalidOid};
                                 55                 :                : static RelFileLocator repacked_rel_toast_locator = {.relNumber = InvalidOid};
                                 56                 :                : 
                                 57                 :                : 
                                 58                 :                : /* REPACK decoding worker entry point */
                                 59                 :                : void
   54 alvherre@kurilemu.de       60                 :GNC           7 : RepackWorkerMain(Datum main_arg)
                                 61                 :                : {
                                 62                 :                :     dsm_segment *seg;
                                 63                 :                :     DecodingWorkerShared *shared;
                                 64                 :                :     shm_mq     *mq;
                                 65                 :                :     shm_mq_handle *mqh;
                                 66                 :                :     LogicalDecodingContext *decoding_ctx;
                                 67                 :                :     SharedFileSet *sfs;
                                 68                 :                :     Snapshot    snapshot;
                                 69                 :                : 
                                 70                 :              7 :     am_repack_worker = true;
                                 71                 :                : 
                                 72                 :              7 :     BackgroundWorkerUnblockSignals();
                                 73                 :                : 
                                 74                 :              7 :     seg = dsm_attach(DatumGetUInt32(main_arg));
                                 75         [ -  + ]:              7 :     if (seg == NULL)
   54 alvherre@kurilemu.de       76         [ #  # ]:UNC           0 :         ereport(ERROR,
                                 77                 :                :                 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                                 78                 :                :                 errmsg("could not map dynamic shared memory segment"));
   44 alvherre@kurilemu.de       79                 :GNC           7 :     worker_dsm_segment = seg;
                                 80                 :                : 
   54                            81                 :              7 :     shared = (DecodingWorkerShared *) dsm_segment_address(seg);
                                 82                 :                : 
                                 83                 :                :     /* Arrange to signal the leader if we exit. */
                                 84                 :              7 :     before_shmem_exit(RepackWorkerShutdown, PointerGetDatum(shared));
                                 85                 :                : 
                                 86                 :                :     /*
                                 87                 :                :      * Join locking group - see the comments around the call of
                                 88                 :                :      * start_repack_decoding_worker().
                                 89                 :                :      */
                                 90         [ -  + ]:              7 :     if (!BecomeLockGroupMember(shared->backend_proc, shared->backend_pid))
   54 alvherre@kurilemu.de       91                 :UNC           0 :         return;                 /* The leader is not running anymore. */
                                 92                 :                : 
                                 93                 :                :     /*
                                 94                 :                :      * Setup a queue to send error messages to the backend that launched this
                                 95                 :                :      * worker.
                                 96                 :                :      */
   54 alvherre@kurilemu.de       97                 :GNC           7 :     mq = (shm_mq *) (char *) BUFFERALIGN(shared->error_queue);
                                 98                 :              7 :     shm_mq_set_sender(mq, MyProc);
                                 99                 :              7 :     mqh = shm_mq_attach(mq, seg, NULL);
                                100                 :              7 :     pq_redirect_to_shm_mq(seg, mqh);
                                101                 :              7 :     pq_set_parallel_leader(shared->backend_pid,
                                102                 :                :                            shared->backend_proc_number);
                                103                 :                : 
                                104                 :                :     /* Connect to the database. LOGIN is not required. */
   40                           105                 :              7 :     BackgroundWorkerInitializeConnectionByOid(shared->dbid, shared->roleid,
                                106                 :                :                                               BGWORKER_BYPASS_ROLELOGINCHECK);
                                107                 :                : 
                                108                 :                :     /*
                                109                 :                :      * Transaction is needed to open relation, and it also provides us with a
                                110                 :                :      * resource owner.
                                111                 :                :      */
   54                           112                 :              7 :     StartTransactionCommand();
                                113                 :                : 
                                114                 :              7 :     shared = (DecodingWorkerShared *) dsm_segment_address(seg);
                                115                 :                : 
                                116                 :                :     /*
                                117                 :                :      * Not sure the spinlock is needed here - the backend should not change
                                118                 :                :      * anything in the shared memory until we have serialized the snapshot.
                                119                 :                :      */
                                120                 :              7 :     SpinLockAcquire(&shared->mutex);
                                121         [ -  + ]:              7 :     Assert(!XLogRecPtrIsValid(shared->lsn_upto));
                                122                 :              7 :     sfs = &shared->sfs;
                                123                 :              7 :     SpinLockRelease(&shared->mutex);
                                124                 :                : 
                                125                 :              7 :     SharedFileSetAttach(sfs, seg);
                                126                 :                : 
                                127                 :                :     /*
                                128                 :                :      * Prepare to capture the concurrent data changes ourselves.
                                129                 :                :      */
                                130                 :              7 :     decoding_ctx = repack_setup_logical_decoding(shared->relid);
                                131                 :                : 
                                132                 :                :     /* Announce that we're ready. */
                                133                 :              7 :     SpinLockAcquire(&shared->mutex);
                                134                 :              7 :     shared->initialized = true;
                                135                 :              7 :     SpinLockRelease(&shared->mutex);
                                136                 :              7 :     ConditionVariableSignal(&shared->cv);
                                137                 :                : 
                                138                 :                :     /* There doesn't seem to a nice API to set these */
                                139                 :              7 :     XactIsoLevel = XACT_REPEATABLE_READ;
                                140                 :              7 :     XactReadOnly = true;
                                141                 :                : 
                                142                 :                :     /* Build the initial snapshot and export it. */
                                143                 :              7 :     snapshot = SnapBuildInitialSnapshot(decoding_ctx->snapshot_builder);
                                144                 :              7 :     export_initial_snapshot(snapshot, shared);
                                145                 :                : 
                                146                 :                :     /*
                                147                 :                :      * Only historic snapshots should be used now. Do not let us restrict the
                                148                 :                :      * progress of xmin horizon.
                                149                 :                :      */
                                150                 :              7 :     InvalidateCatalogSnapshot();
                                151                 :                : 
                                152                 :                :     for (;;)
                                153                 :              7 :     {
                                154                 :             14 :         bool        stop = decode_concurrent_changes(decoding_ctx, shared);
                                155                 :                : 
                                156         [ +  + ]:             14 :         if (stop)
                                157                 :              7 :             break;
                                158                 :                : 
                                159                 :                :     }
                                160                 :                : 
                                161                 :                :     /* Cleanup. */
                                162                 :              7 :     repack_cleanup_logical_decoding(decoding_ctx);
                                163                 :              7 :     CommitTransactionCommand();
                                164                 :                : }
                                165                 :                : 
                                166                 :                : /*
                                167                 :                :  * See ParallelWorkerShutdown for details.
                                168                 :                :  */
                                169                 :                : static void
                                170                 :              7 : RepackWorkerShutdown(int code, Datum arg)
                                171                 :                : {
                                172                 :              7 :     DecodingWorkerShared *shared = (DecodingWorkerShared *) DatumGetPointer(arg);
                                173                 :                : 
                                174                 :              7 :     SendProcSignal(shared->backend_pid,
                                175                 :                :                    PROCSIG_REPACK_MESSAGE,
                                176                 :                :                    shared->backend_proc_number);
                                177                 :                : 
   44                           178                 :              7 :     dsm_detach(worker_dsm_segment);
   54                           179                 :              7 : }
                                180                 :                : 
                                181                 :                : bool
                                182                 :           2012 : AmRepackWorker(void)
                                183                 :                : {
                                184                 :           2012 :     return am_repack_worker;
                                185                 :                : }
                                186                 :                : 
                                187                 :                : /*
                                188                 :                :  * This function is much like pg_create_logical_replication_slot() except that
                                189                 :                :  * the new slot is neither released (if anyone else could read changes from
                                190                 :                :  * our slot, we could miss changes other backends do while we copy the
                                191                 :                :  * existing data into temporary table), nor persisted (it's easier to handle
                                192                 :                :  * crash by restarting all the work from scratch).
                                193                 :                :  */
                                194                 :                : static LogicalDecodingContext *
                                195                 :              7 : repack_setup_logical_decoding(Oid relid)
                                196                 :                : {
                                197                 :                :     Relation    rel;
                                198                 :                :     Oid         toastrelid;
                                199                 :                :     LogicalDecodingContext *ctx;
                                200                 :                :     NameData    slotname;
                                201                 :                :     RepackDecodingState *dstate;
                                202                 :                :     MemoryContext oldcxt;
                                203                 :                : 
                                204                 :                :     /*
                                205                 :                :      * REPACK CONCURRENTLY is not allowed in a transaction block, so this
                                206                 :                :      * should never fire.
                                207                 :                :      */
                                208         [ -  + ]:              7 :     Assert(!TransactionIdIsValid(GetTopTransactionIdIfAny()));
                                209                 :                : 
                                210                 :                :     /*
                                211                 :                :      * Make sure we can use logical decoding.
                                212                 :                :      */
   53                           213                 :              7 :     CheckLogicalDecodingRequirements(true);
                                214                 :                : 
                                215                 :                :     /*
                                216                 :                :      * A single backend should not execute multiple REPACK commands at a time,
                                217                 :                :      * so use PID to make the slot unique.
                                218                 :                :      *
                                219                 :                :      * RS_TEMPORARY so that the slot gets cleaned up on ERROR.
                                220                 :                :      */
   54                           221                 :              7 :     snprintf(NameStr(slotname), NAMEDATALEN, "repack_%d", MyProcPid);
   53                           222                 :              7 :     ReplicationSlotCreate(NameStr(slotname), true, RS_TEMPORARY, false, true,
                                223                 :                :                           false, false);
                                224                 :                : 
   54                           225                 :              7 :     EnsureLogicalDecodingEnabled();
                                226                 :                : 
                                227                 :                :     /*
                                228                 :                :      * Neither prepare_write nor do_write callback nor update_progress is
                                229                 :                :      * useful for us.
                                230                 :                :      */
                                231                 :              7 :     ctx = CreateInitDecodingContext(REPL_PLUGIN_NAME,
                                232                 :                :                                     NIL,
                                233                 :                :                                     true,
                                234                 :                :                                     true,
                                235                 :                :                                     InvalidXLogRecPtr,
                                236                 :              7 :                                     XL_ROUTINE(.page_read = read_local_xlog_page,
                                237                 :                :                                                .segment_open = wal_segment_open,
                                238                 :                :                                                .segment_close = wal_segment_close),
                                239                 :                :                                     NULL, NULL, NULL);
                                240                 :                : 
                                241                 :                :     /*
                                242                 :                :      * We don't have control on setting fast_forward, so at least check it.
                                243                 :                :      */
                                244         [ -  + ]:              7 :     Assert(!ctx->fast_forward);
                                245                 :                : 
                                246                 :                :     /* Avoid logical decoding of other relations. */
                                247                 :              7 :     rel = table_open(relid, AccessShareLock);
                                248                 :              7 :     repacked_rel_locator = rel->rd_locator;
                                249                 :              7 :     toastrelid = rel->rd_rel->reltoastrelid;
                                250         [ +  + ]:              7 :     if (OidIsValid(toastrelid))
                                251                 :                :     {
                                252                 :                :         Relation    toastrel;
                                253                 :                : 
                                254                 :                :         /* Avoid logical decoding of other TOAST relations. */
                                255                 :              3 :         toastrel = table_open(toastrelid, AccessShareLock);
                                256                 :              3 :         repacked_rel_toast_locator = toastrel->rd_locator;
                                257                 :              3 :         table_close(toastrel, AccessShareLock);
                                258                 :                :     }
                                259                 :              7 :     table_close(rel, AccessShareLock);
                                260                 :                : 
                                261                 :              7 :     DecodingContextFindStartpoint(ctx);
                                262                 :                : 
                                263                 :                :     /*
                                264                 :                :      * decode_concurrent_changes() needs non-blocking callback.
                                265                 :                :      */
                                266                 :              7 :     ctx->reader->routine.page_read = read_local_xlog_page_no_wait;
                                267                 :                : 
                                268                 :                :     /* Some WAL records should have been read. */
   44 fujii@postgresql.org      269         [ -  + ]:              7 :     Assert(XLogRecPtrIsValid(ctx->reader->EndRecPtr));
                                270                 :                : 
                                271                 :                :     /*
                                272                 :                :      * Initialize repack_current_segment so that we can notice WAL segment
                                273                 :                :      * boundaries.
                                274                 :                :      */
   54 alvherre@kurilemu.de      275                 :              7 :     XLByteToSeg(ctx->reader->EndRecPtr, repack_current_segment,
                                276                 :                :                 wal_segment_size);
                                277                 :                : 
                                278                 :                :     /* Our private state belongs to the decoding context. */
                                279                 :              7 :     oldcxt = MemoryContextSwitchTo(ctx->context);
                                280                 :                : 
                                281                 :                :     /*
                                282                 :                :      * read_local_xlog_page_no_wait() needs to be able to indicate the end of
                                283                 :                :      * WAL.
                                284                 :                :      */
                                285                 :              7 :     ctx->reader->private_data = palloc0_object(ReadLocalXLogPageNoWaitPrivate);
                                286                 :              7 :     dstate = palloc0_object(RepackDecodingState);
                                287                 :              7 :     MemoryContextSwitchTo(oldcxt);
                                288                 :                : 
                                289                 :                : #ifdef  USE_ASSERT_CHECKING
                                290                 :              7 :     dstate->relid = relid;
                                291                 :                : #endif
                                292                 :                : 
                                293                 :              7 :     dstate->change_cxt = AllocSetContextCreate(ctx->context,
                                294                 :                :                                                "REPACK - change",
                                295                 :                :                                                ALLOCSET_DEFAULT_SIZES);
                                296                 :                : 
                                297                 :                :     /* The file will be set as soon as we have it opened. */
                                298                 :              7 :     dstate->file = NULL;
                                299                 :                : 
                                300                 :                :     /*
                                301                 :                :      * Memory context and resource owner for long-lived resources.
                                302                 :                :      */
                                303                 :              7 :     dstate->worker_cxt = CurrentMemoryContext;
                                304                 :              7 :     dstate->worker_resowner = CurrentResourceOwner;
                                305                 :                : 
                                306                 :              7 :     ctx->output_writer_private = dstate;
                                307                 :                : 
                                308                 :              7 :     return ctx;
                                309                 :                : }
                                310                 :                : 
                                311                 :                : static void
                                312                 :              7 : repack_cleanup_logical_decoding(LogicalDecodingContext *ctx)
                                313                 :                : {
                                314                 :                :     RepackDecodingState *dstate;
                                315                 :                : 
                                316                 :              7 :     dstate = (RepackDecodingState *) ctx->output_writer_private;
                                317         [ +  + ]:              7 :     if (dstate->slot)
                                318                 :              1 :         ExecDropSingleTupleTableSlot(dstate->slot);
                                319                 :                : 
                                320                 :              7 :     FreeDecodingContext(ctx);
    3                           321                 :              7 :     ReplicationSlotDropAcquired(true);
   54                           322                 :              7 : }
                                323                 :                : 
                                324                 :                : /*
                                325                 :                :  * Make snapshot available to the backend that launched the decoding worker.
                                326                 :                :  */
                                327                 :                : static void
                                328                 :              7 : export_initial_snapshot(Snapshot snapshot, DecodingWorkerShared *shared)
                                329                 :                : {
                                330                 :                :     char        fname[MAXPGPATH];
                                331                 :                :     BufFile    *file;
                                332                 :                :     Size        snap_size;
                                333                 :                :     char       *snap_space;
                                334                 :                : 
                                335                 :              7 :     snap_size = EstimateSnapshotSpace(snapshot);
                                336                 :              7 :     snap_space = (char *) palloc(snap_size);
                                337                 :              7 :     SerializeSnapshot(snapshot, snap_space);
                                338                 :                : 
                                339                 :              7 :     DecodingWorkerFileName(fname, shared->relid, shared->last_exported + 1);
                                340                 :              7 :     file = BufFileCreateFileSet(&shared->sfs.fs, fname);
                                341                 :                :     /* To make restoration easier, write the snapshot size first. */
                                342                 :              7 :     BufFileWrite(file, &snap_size, sizeof(snap_size));
                                343                 :              7 :     BufFileWrite(file, snap_space, snap_size);
                                344                 :              7 :     BufFileClose(file);
   53                           345                 :              7 :     pfree(snap_space);
                                346                 :                : 
                                347                 :                :     /* Increase the counter to tell the backend that the file is available. */
   54                           348                 :              7 :     SpinLockAcquire(&shared->mutex);
                                349                 :              7 :     shared->last_exported++;
                                350                 :              7 :     SpinLockRelease(&shared->mutex);
                                351                 :              7 :     ConditionVariableSignal(&shared->cv);
                                352                 :              7 : }
                                353                 :                : 
                                354                 :                : /*
                                355                 :                :  * Decode logical changes from the WAL sequence and store them to a file.
                                356                 :                :  *
                                357                 :                :  * If true is returned, there is no more work for the worker.
                                358                 :                :  */
                                359                 :                : static bool
                                360                 :             14 : decode_concurrent_changes(LogicalDecodingContext *ctx,
                                361                 :                :                           DecodingWorkerShared *shared)
                                362                 :                : {
                                363                 :                :     RepackDecodingState *dstate;
                                364                 :                :     XLogRecPtr  lsn_upto;
                                365                 :                :     bool        done;
                                366                 :                :     char        fname[MAXPGPATH];
                                367                 :                : 
                                368                 :             14 :     dstate = (RepackDecodingState *) ctx->output_writer_private;
                                369                 :                : 
                                370                 :                :     /* Open the output file. */
                                371                 :             14 :     DecodingWorkerFileName(fname, shared->relid, shared->last_exported + 1);
                                372                 :             14 :     dstate->file = BufFileCreateFileSet(&shared->sfs.fs, fname);
                                373                 :                : 
                                374                 :             14 :     SpinLockAcquire(&shared->mutex);
                                375                 :             14 :     lsn_upto = shared->lsn_upto;
                                376                 :             14 :     done = shared->done;
                                377                 :             14 :     SpinLockRelease(&shared->mutex);
                                378                 :                : 
                                379                 :                :     while (true)
                                380                 :           1468 :     {
                                381                 :                :         XLogRecord *record;
                                382                 :                :         XLogSegNo   segno_new;
                                383                 :           1482 :         char       *errm = NULL;
                                384                 :                :         XLogRecPtr  end_lsn;
                                385                 :                : 
                                386         [ -  + ]:           1482 :         CHECK_FOR_INTERRUPTS();
                                387                 :                : 
                                388                 :           1482 :         record = XLogReadRecord(ctx->reader, &errm);
                                389         [ +  + ]:           1482 :         if (record)
                                390                 :                :         {
                                391                 :           1445 :             LogicalDecodingProcessRecord(ctx, ctx->reader);
                                392                 :                : 
                                393                 :                :             /*
                                394                 :                :              * We want to allow WAL to be recycled while REPACK is running.
                                395                 :                :              *
                                396                 :                :              * In normal usage of a replication slot, we need to be very
                                397                 :                :              * careful not to advance the LSN until it's been confirmed as
                                398                 :                :              * received by the remote.  In REPACK's case, this is not needed:
                                399                 :                :              * REPACK will never try to replay the same WAL after a crash, and
                                400                 :                :              * if there _is_ a crash, the whole REPACK has to be started from
                                401                 :                :              * scratch anyway.
                                402                 :                :              *
                                403                 :                :              * So here we disregard the careful LSN tracking and just move the
                                404                 :                :              * LSN locations forward to what we've processed.  Note that it
                                405                 :                :              * would be bogus to move the xmin forward, though, so we don't
                                406                 :                :              * touch that.
                                407                 :                :              *
                                408                 :                :              * This can be done on whatever schedule is convenient, but in
                                409                 :                :              * order not to cause unnecessary load, we only do it as we cross
                                410                 :                :              * each WAL segment boundary.
                                411                 :                :              */
                                412                 :           1445 :             end_lsn = ctx->reader->EndRecPtr;
                                413                 :           1445 :             XLByteToSeg(end_lsn, segno_new, wal_segment_size);
                                414         [ -  + ]:           1445 :             if (segno_new != repack_current_segment)
                                415                 :                :             {
    0 alvherre@kurilemu.de      416                 :UNC           0 :                 LogicalIncreaseRestartDecodingForSlot(end_lsn, end_lsn);
   54                           417                 :              0 :                 LogicalConfirmReceivedLocation(end_lsn);
                                418         [ #  # ]:              0 :                 elog(DEBUG1, "REPACK: confirmed receive location %X/%X",
                                419                 :                :                      (uint32) (end_lsn >> 32), (uint32) end_lsn);
                                420                 :              0 :                 repack_current_segment = segno_new;
                                421                 :                :             }
                                422                 :                :         }
                                423                 :                :         else
                                424                 :                :         {
                                425                 :                :             ReadLocalXLogPageNoWaitPrivate *priv;
                                426                 :                : 
   54 alvherre@kurilemu.de      427         [ -  + ]:GNC          37 :             if (errm)
   54 alvherre@kurilemu.de      428         [ #  # ]:UNC           0 :                 ereport(ERROR,
                                429                 :                :                         errcode_for_file_access(),
                                430                 :                :                         errmsg("could not read WAL from timeline %u at %X/%08X: %s",
                                431                 :                :                                ctx->reader->currTLI,
                                432                 :                :                                LSN_FORMAT_ARGS(ctx->reader->EndRecPtr),
                                433                 :                :                                errm));
                                434                 :                : 
                                435                 :                :             /*
                                436                 :                :              * In the decoding loop we do not want to get blocked when there
                                437                 :                :              * is no more WAL available, otherwise the loop would become
                                438                 :                :              * uninterruptible.
                                439                 :                :              */
   54 alvherre@kurilemu.de      440                 :GNC          37 :             priv = (ReadLocalXLogPageNoWaitPrivate *) ctx->reader->private_data;
                                441         [ +  - ]:             37 :             if (priv->end_of_wal)
                                442                 :                :                 /* Do not miss the end of WAL condition next time. */
                                443                 :             37 :                 priv->end_of_wal = false;
                                444                 :                :             else
   54 alvherre@kurilemu.de      445         [ #  # ]:UNC           0 :                 ereport(ERROR,
                                446                 :                :                         errcode(ERRCODE_DATA_CORRUPTED),
                                447                 :                :                         errmsg("could not read WAL record"));
                                448                 :                :         }
                                449                 :                : 
                                450                 :                :         /*
                                451                 :                :          * Whether we could read new record or not, keep checking if
                                452                 :                :          * 'lsn_upto' was specified.
                                453                 :                :          */
   54 alvherre@kurilemu.de      454         [ +  + ]:GNC        1482 :         if (!XLogRecPtrIsValid(lsn_upto))
                                455                 :                :         {
                                456                 :           1356 :             SpinLockAcquire(&shared->mutex);
                                457                 :           1356 :             lsn_upto = shared->lsn_upto;
                                458                 :                :             /* 'done' should be set at the same time as 'lsn_upto' */
                                459                 :           1356 :             done = shared->done;
                                460                 :           1356 :             SpinLockRelease(&shared->mutex);
                                461                 :                :         }
                                462         [ +  + ]:           1482 :         if (XLogRecPtrIsValid(lsn_upto) &&
                                463         [ +  + ]:            140 :             ctx->reader->EndRecPtr >= lsn_upto)
                                464                 :             14 :             break;
                                465                 :                : 
                                466         [ +  + ]:           1468 :         if (record == NULL)
                                467                 :                :         {
                                468                 :             29 :             int64       timeout = 0;
                                469                 :                :             WaitLSNResult res;
                                470                 :                : 
                                471                 :                :             /*
                                472                 :                :              * Before we retry reading, wait until new WAL is flushed.
                                473                 :                :              *
                                474                 :                :              * There is a race condition such that the backend executing
                                475                 :                :              * REPACK determines 'lsn_upto', but before it sets the shared
                                476                 :                :              * variable, we reach the end of WAL. In that case we'd need to
                                477                 :                :              * wait until the next WAL flush (unrelated to REPACK). Although
                                478                 :                :              * that should not be a problem in a busy system, it might be
                                479                 :                :              * noticeable in other cases, including regression tests (which
                                480                 :                :              * are not necessarily executed in parallel). Therefore it makes
                                481                 :                :              * sense to use timeout.
                                482                 :                :              *
                                483                 :                :              * If lsn_upto is valid, WAL records having LSN lower than that
                                484                 :                :              * should already have been flushed to disk.
                                485                 :                :              */
                                486         [ +  - ]:             29 :             if (!XLogRecPtrIsValid(lsn_upto))
                                487                 :             29 :                 timeout = 100L;
                                488                 :             29 :             res = WaitForLSN(WAIT_LSN_TYPE_PRIMARY_FLUSH,
                                489                 :             29 :                              ctx->reader->EndRecPtr + 1,
                                490                 :                :                              timeout);
                                491   [ +  +  -  + ]:             29 :             if (res != WAIT_LSN_RESULT_SUCCESS &&
                                492                 :                :                 res != WAIT_LSN_RESULT_TIMEOUT)
   54 alvherre@kurilemu.de      493         [ #  # ]:UNC           0 :                 ereport(ERROR,
                                494                 :                :                         errcode(ERRCODE_INTERNAL_ERROR),
                                495                 :                :                         errmsg("waiting for WAL failed"));
                                496                 :                :         }
                                497                 :                :     }
                                498                 :                : 
                                499                 :                :     /*
                                500                 :                :      * Close the file so we can make it available to the backend.
                                501                 :                :      */
   54 alvherre@kurilemu.de      502                 :GNC          14 :     BufFileClose(dstate->file);
                                503                 :             14 :     dstate->file = NULL;
                                504                 :             14 :     SpinLockAcquire(&shared->mutex);
                                505                 :             14 :     shared->lsn_upto = InvalidXLogRecPtr;
                                506                 :             14 :     shared->last_exported++;
                                507                 :             14 :     SpinLockRelease(&shared->mutex);
                                508                 :             14 :     ConditionVariableSignal(&shared->cv);
                                509                 :                : 
                                510                 :             14 :     return done;
                                511                 :                : }
                                512                 :                : 
                                513                 :                : /*
                                514                 :                :  * Does the WAL record contain a data change that this backend does not need
                                515                 :                :  * to decode on behalf of REPACK (CONCURRENTLY)?
                                516                 :                :  */
                                517                 :                : bool
                                518                 :        1398389 : change_useless_for_repack(XLogRecordBuffer *buf)
                                519                 :                : {
                                520                 :        1398389 :     XLogReaderState *r = buf->record;
                                521                 :                :     RelFileLocator locator;
                                522                 :                : 
                                523                 :                :     /* TOAST locator should not be set unless the main is. */
                                524   [ +  +  -  + ]:        1398389 :     Assert(!OidIsValid(repacked_rel_toast_locator.relNumber) ||
                                525                 :                :            OidIsValid(repacked_rel_locator.relNumber));
                                526                 :                : 
                                527                 :                :     /*
                                528                 :                :      * Backends not involved in REPACK (CONCURRENTLY) should not do the
                                529                 :                :      * filtering.
                                530                 :                :      */
                                531         [ +  + ]:        1398389 :     if (!OidIsValid(repacked_rel_locator.relNumber))
                                532                 :        1397959 :         return false;
                                533                 :                : 
                                534                 :                :     /*
                                535                 :                :      * If the record does not contain the block 0, it's probably not INSERT /
                                536                 :                :      * UPDATE / DELETE. In any case, we do not have enough information to
                                537                 :                :      * filter the change out.
                                538                 :                :      */
                                539         [ -  + ]:            430 :     if (!XLogRecGetBlockTagExtended(r, 0, &locator, NULL, NULL, NULL))
   54 alvherre@kurilemu.de      540                 :UNC           0 :         return false;
                                541                 :                : 
                                542                 :                :     /*
                                543                 :                :      * Decode the change if it belongs to the table we are repacking, or if it
                                544                 :                :      * belongs to its TOAST relation.
                                545                 :                :      */
   54 alvherre@kurilemu.de      546   [ +  +  +  -  :GNC         430 :     if (RelFileLocatorEquals(locator, repacked_rel_locator))
                                              +  - ]
                                547                 :             33 :         return false;
                                548         [ +  + ]:            397 :     if (OidIsValid(repacked_rel_toast_locator.relNumber) &&
                                549   [ +  +  +  -  :            299 :         RelFileLocatorEquals(locator, repacked_rel_toast_locator))
                                              +  - ]
                                550                 :             44 :         return false;
                                551                 :                : 
                                552                 :                :     /* Filter out changes of other tables. */
                                553                 :            353 :     return true;
                                554                 :                : }
        

Generated by: LCOV version 2.5.0-beta