LCOV - differential code coverage report
Current view: top level - src/include/replication - worker_internal.h (source / functions) Coverage Total Hit UBC CBC
Current: c70b6db34ffeab48beef1fb4ce61bcad3772b8dd vs 06473f5a344df8c9594ead90a609b86f6724cff8 Lines: 100.0 % 8 8 8
Current Date: 2025-09-06 07:49:51 +0900 Functions: 100.0 % 3 3 3
Baseline: lcov-20250906-005545-baseline Branches: 66.7 % 12 8 4 8
Baseline Date: 2025-09-05 08:21:35 +0100 Line coverage date bins:
Legend: Lines:     hit not hit
Branches: + taken - not taken # not executed
(360..) days: 100.0 % 8 8 8
Function coverage date bins:
(360..) days: 100.0 % 3 3 3
Branch coverage date bins:
(360..) days: 66.7 % 12 8 4 8

 Age         Owner                    Branch data    TLA  Line data    Source code
                                  1                 :                : /*-------------------------------------------------------------------------
                                  2                 :                :  *
                                  3                 :                :  * worker_internal.h
                                  4                 :                :  *    Internal headers shared by logical replication workers.
                                  5                 :                :  *
                                  6                 :                :  * Portions Copyright (c) 2016-2025, PostgreSQL Global Development Group
                                  7                 :                :  *
                                  8                 :                :  * src/include/replication/worker_internal.h
                                  9                 :                :  *
                                 10                 :                :  *-------------------------------------------------------------------------
                                 11                 :                :  */
                                 12                 :                : #ifndef WORKER_INTERNAL_H
                                 13                 :                : #define WORKER_INTERNAL_H
                                 14                 :                : 
                                 15                 :                : #include "access/xlogdefs.h"
                                 16                 :                : #include "catalog/pg_subscription.h"
                                 17                 :                : #include "datatype/timestamp.h"
                                 18                 :                : #include "miscadmin.h"
                                 19                 :                : #include "replication/logicalrelation.h"
                                 20                 :                : #include "replication/walreceiver.h"
                                 21                 :                : #include "storage/buffile.h"
                                 22                 :                : #include "storage/fileset.h"
                                 23                 :                : #include "storage/lock.h"
                                 24                 :                : #include "storage/shm_mq.h"
                                 25                 :                : #include "storage/shm_toc.h"
                                 26                 :                : #include "storage/spin.h"
                                 27                 :                : 
                                 28                 :                : /* Different types of worker */
                                 29                 :                : typedef enum LogicalRepWorkerType
                                 30                 :                : {
                                 31                 :                :     WORKERTYPE_UNKNOWN = 0,
                                 32                 :                :     WORKERTYPE_TABLESYNC,
                                 33                 :                :     WORKERTYPE_APPLY,
                                 34                 :                :     WORKERTYPE_PARALLEL_APPLY,
                                 35                 :                : } LogicalRepWorkerType;
                                 36                 :                : 
                                 37                 :                : typedef struct LogicalRepWorker
                                 38                 :                : {
                                 39                 :                :     /* What type of worker is this? */
                                 40                 :                :     LogicalRepWorkerType type;
                                 41                 :                : 
                                 42                 :                :     /* Time at which this worker was launched. */
                                 43                 :                :     TimestampTz launch_time;
                                 44                 :                : 
                                 45                 :                :     /* Indicates if this slot is used or free. */
                                 46                 :                :     bool        in_use;
                                 47                 :                : 
                                 48                 :                :     /* Increased every time the slot is taken by new worker. */
                                 49                 :                :     uint16      generation;
                                 50                 :                : 
                                 51                 :                :     /* Pointer to proc array. NULL if not running. */
                                 52                 :                :     PGPROC     *proc;
                                 53                 :                : 
                                 54                 :                :     /* Database id to connect to. */
                                 55                 :                :     Oid         dbid;
                                 56                 :                : 
                                 57                 :                :     /* User to use for connection (will be same as owner of subscription). */
                                 58                 :                :     Oid         userid;
                                 59                 :                : 
                                 60                 :                :     /* Subscription id for the worker. */
                                 61                 :                :     Oid         subid;
                                 62                 :                : 
                                 63                 :                :     /* Used for initial table synchronization. */
                                 64                 :                :     Oid         relid;
                                 65                 :                :     char        relstate;
                                 66                 :                :     XLogRecPtr  relstate_lsn;
                                 67                 :                :     slock_t     relmutex;
                                 68                 :                : 
                                 69                 :                :     /*
                                 70                 :                :      * Used to create the changes and subxact files for the streaming
                                 71                 :                :      * transactions.  Upon the arrival of the first streaming transaction or
                                 72                 :                :      * when the first-time leader apply worker times out while sending changes
                                 73                 :                :      * to the parallel apply worker, the fileset will be initialized, and it
                                 74                 :                :      * will be deleted when the worker exits.  Under this, separate buffiles
                                 75                 :                :      * would be created for each transaction which will be deleted after the
                                 76                 :                :      * transaction is finished.
                                 77                 :                :      */
                                 78                 :                :     FileSet    *stream_fileset;
                                 79                 :                : 
                                 80                 :                :     /*
                                 81                 :                :      * PID of leader apply worker if this slot is used for a parallel apply
                                 82                 :                :      * worker, InvalidPid otherwise.
                                 83                 :                :      */
                                 84                 :                :     pid_t       leader_pid;
                                 85                 :                : 
                                 86                 :                :     /* Indicates whether apply can be performed in parallel. */
                                 87                 :                :     bool        parallel_apply;
                                 88                 :                : 
                                 89                 :                :     /*
                                 90                 :                :      * Changes made by this transaction and subsequent ones must be preserved.
                                 91                 :                :      * This ensures that update_deleted conflicts can be accurately detected
                                 92                 :                :      * during the apply phase of logical replication by this worker.
                                 93                 :                :      *
                                 94                 :                :      * The logical replication launcher manages an internal replication slot
                                 95                 :                :      * named "pg_conflict_detection". It asynchronously collects this ID to
                                 96                 :                :      * decide when to advance the xmin value of the slot.
                                 97                 :                :      *
                                 98                 :                :      * This ID is set to InvalidTransactionId when the apply worker stops
                                 99                 :                :      * retaining information needed for conflict detection.
                                100                 :                :      */
                                101                 :                :     TransactionId oldest_nonremovable_xid;
                                102                 :                : 
                                103                 :                :     /* Stats. */
                                104                 :                :     XLogRecPtr  last_lsn;
                                105                 :                :     TimestampTz last_send_time;
                                106                 :                :     TimestampTz last_recv_time;
                                107                 :                :     XLogRecPtr  reply_lsn;
                                108                 :                :     TimestampTz reply_time;
                                109                 :                : } LogicalRepWorker;
                                110                 :                : 
                                111                 :                : /*
                                112                 :                :  * State of the transaction in parallel apply worker.
                                113                 :                :  *
                                114                 :                :  * The enum values must have the same order as the transaction state
                                115                 :                :  * transitions.
                                116                 :                :  */
                                117                 :                : typedef enum ParallelTransState
                                118                 :                : {
                                119                 :                :     PARALLEL_TRANS_UNKNOWN,
                                120                 :                :     PARALLEL_TRANS_STARTED,
                                121                 :                :     PARALLEL_TRANS_FINISHED,
                                122                 :                : } ParallelTransState;
                                123                 :                : 
                                124                 :                : /*
                                125                 :                :  * State of fileset used to communicate changes from leader to parallel
                                126                 :                :  * apply worker.
                                127                 :                :  *
                                128                 :                :  * FS_EMPTY indicates an initial state where the leader doesn't need to use
                                129                 :                :  * the file to communicate with the parallel apply worker.
                                130                 :                :  *
                                131                 :                :  * FS_SERIALIZE_IN_PROGRESS indicates that the leader is serializing changes
                                132                 :                :  * to the file.
                                133                 :                :  *
                                134                 :                :  * FS_SERIALIZE_DONE indicates that the leader has serialized all changes to
                                135                 :                :  * the file.
                                136                 :                :  *
                                137                 :                :  * FS_READY indicates that it is now ok for a parallel apply worker to
                                138                 :                :  * read the file.
                                139                 :                :  */
                                140                 :                : typedef enum PartialFileSetState
                                141                 :                : {
                                142                 :                :     FS_EMPTY,
                                143                 :                :     FS_SERIALIZE_IN_PROGRESS,
                                144                 :                :     FS_SERIALIZE_DONE,
                                145                 :                :     FS_READY,
                                146                 :                : } PartialFileSetState;
                                147                 :                : 
                                148                 :                : /*
                                149                 :                :  * Struct for sharing information between leader apply worker and parallel
                                150                 :                :  * apply workers.
                                151                 :                :  */
                                152                 :                : typedef struct ParallelApplyWorkerShared
                                153                 :                : {
                                154                 :                :     slock_t     mutex;
                                155                 :                : 
                                156                 :                :     TransactionId xid;
                                157                 :                : 
                                158                 :                :     /*
                                159                 :                :      * State used to ensure commit ordering.
                                160                 :                :      *
                                161                 :                :      * The parallel apply worker will set it to PARALLEL_TRANS_FINISHED after
                                162                 :                :      * handling the transaction finish commands while the apply leader will
                                163                 :                :      * wait for it to become PARALLEL_TRANS_FINISHED before proceeding in
                                164                 :                :      * transaction finish commands (e.g. STREAM_COMMIT/STREAM_PREPARE/
                                165                 :                :      * STREAM_ABORT).
                                166                 :                :      */
                                167                 :                :     ParallelTransState xact_state;
                                168                 :                : 
                                169                 :                :     /* Information from the corresponding LogicalRepWorker slot. */
                                170                 :                :     uint16      logicalrep_worker_generation;
                                171                 :                :     int         logicalrep_worker_slot_no;
                                172                 :                : 
                                173                 :                :     /*
                                174                 :                :      * Indicates whether there are pending streaming blocks in the queue. The
                                175                 :                :      * parallel apply worker will check it before starting to wait.
                                176                 :                :      */
                                177                 :                :     pg_atomic_uint32 pending_stream_count;
                                178                 :                : 
                                179                 :                :     /*
                                180                 :                :      * XactLastCommitEnd from the parallel apply worker. This is required by
                                181                 :                :      * the leader worker so it can update the lsn_mappings.
                                182                 :                :      */
                                183                 :                :     XLogRecPtr  last_commit_end;
                                184                 :                : 
                                185                 :                :     /*
                                186                 :                :      * After entering PARTIAL_SERIALIZE mode, the leader apply worker will
                                187                 :                :      * serialize changes to the file, and share the fileset with the parallel
                                188                 :                :      * apply worker when processing the transaction finish command. Then the
                                189                 :                :      * parallel apply worker will apply all the spooled messages.
                                190                 :                :      *
                                191                 :                :      * FileSet is used here instead of SharedFileSet because we need it to
                                192                 :                :      * survive after releasing the shared memory so that the leader apply
                                193                 :                :      * worker can re-use the same fileset for the next streaming transaction.
                                194                 :                :      */
                                195                 :                :     PartialFileSetState fileset_state;
                                196                 :                :     FileSet     fileset;
                                197                 :                : } ParallelApplyWorkerShared;
                                198                 :                : 
                                199                 :                : /*
                                200                 :                :  * Information which is used to manage the parallel apply worker.
                                201                 :                :  */
                                202                 :                : typedef struct ParallelApplyWorkerInfo
                                203                 :                : {
                                204                 :                :     /*
                                205                 :                :      * This queue is used to send changes from the leader apply worker to the
                                206                 :                :      * parallel apply worker.
                                207                 :                :      */
                                208                 :                :     shm_mq_handle *mq_handle;
                                209                 :                : 
                                210                 :                :     /*
                                211                 :                :      * This queue is used to transfer error messages from the parallel apply
                                212                 :                :      * worker to the leader apply worker.
                                213                 :                :      */
                                214                 :                :     shm_mq_handle *error_mq_handle;
                                215                 :                : 
                                216                 :                :     dsm_segment *dsm_seg;
                                217                 :                : 
                                218                 :                :     /*
                                219                 :                :      * Indicates whether the leader apply worker needs to serialize the
                                220                 :                :      * remaining changes to a file due to timeout when attempting to send data
                                221                 :                :      * to the parallel apply worker via shared memory.
                                222                 :                :      */
                                223                 :                :     bool        serialize_changes;
                                224                 :                : 
                                225                 :                :     /*
                                226                 :                :      * True if the worker is being used to process a parallel apply
                                227                 :                :      * transaction. False indicates this worker is available for re-use.
                                228                 :                :      */
                                229                 :                :     bool        in_use;
                                230                 :                : 
                                231                 :                :     ParallelApplyWorkerShared *shared;
                                232                 :                : } ParallelApplyWorkerInfo;
                                233                 :                : 
                                234                 :                : /* Main memory context for apply worker. Permanent during worker lifetime. */
                                235                 :                : extern PGDLLIMPORT MemoryContext ApplyContext;
                                236                 :                : 
                                237                 :                : extern PGDLLIMPORT MemoryContext ApplyMessageContext;
                                238                 :                : 
                                239                 :                : extern PGDLLIMPORT ErrorContextCallback *apply_error_context_stack;
                                240                 :                : 
                                241                 :                : extern PGDLLIMPORT ParallelApplyWorkerShared *MyParallelShared;
                                242                 :                : 
                                243                 :                : /* libpqreceiver connection */
                                244                 :                : extern PGDLLIMPORT struct WalReceiverConn *LogRepWorkerWalRcvConn;
                                245                 :                : 
                                246                 :                : /* Worker and subscription objects. */
                                247                 :                : extern PGDLLIMPORT Subscription *MySubscription;
                                248                 :                : extern PGDLLIMPORT LogicalRepWorker *MyLogicalRepWorker;
                                249                 :                : 
                                250                 :                : extern PGDLLIMPORT bool in_remote_transaction;
                                251                 :                : 
                                252                 :                : extern PGDLLIMPORT bool InitializingApplyWorker;
                                253                 :                : 
                                254                 :                : extern void logicalrep_worker_attach(int slot);
                                255                 :                : extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid,
                                256                 :                :                                                 bool only_running);
                                257                 :                : extern List *logicalrep_workers_find(Oid subid, bool only_running,
                                258                 :                :                                      bool acquire_lock);
                                259                 :                : extern bool logicalrep_worker_launch(LogicalRepWorkerType wtype,
                                260                 :                :                                      Oid dbid, Oid subid, const char *subname,
                                261                 :                :                                      Oid userid, Oid relid,
                                262                 :                :                                      dsm_handle subworker_dsm,
                                263                 :                :                                      bool retain_dead_tuples);
                                264                 :                : extern void logicalrep_worker_stop(Oid subid, Oid relid);
                                265                 :                : extern void logicalrep_pa_worker_stop(ParallelApplyWorkerInfo *winfo);
                                266                 :                : extern void logicalrep_worker_wakeup(Oid subid, Oid relid);
                                267                 :                : extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker);
                                268                 :                : 
                                269                 :                : extern int  logicalrep_sync_worker_count(Oid subid);
                                270                 :                : 
                                271                 :                : extern void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid,
                                272                 :                :                                                char *originname, Size szoriginname);
                                273                 :                : 
                                274                 :                : extern bool AllTablesyncsReady(void);
                                275                 :                : extern void UpdateTwoPhaseState(Oid suboid, char new_state);
                                276                 :                : 
                                277                 :                : extern void process_syncing_tables(XLogRecPtr current_lsn);
                                278                 :                : extern void invalidate_syncing_table_states(Datum arg, int cacheid,
                                279                 :                :                                             uint32 hashvalue);
                                280                 :                : 
                                281                 :                : extern void stream_start_internal(TransactionId xid, bool first_segment);
                                282                 :                : extern void stream_stop_internal(TransactionId xid);
                                283                 :                : 
                                284                 :                : /* Common streaming function to apply all the spooled messages */
                                285                 :                : extern void apply_spooled_messages(FileSet *stream_fileset, TransactionId xid,
                                286                 :                :                                    XLogRecPtr lsn);
                                287                 :                : 
                                288                 :                : extern void apply_dispatch(StringInfo s);
                                289                 :                : 
                                290                 :                : extern void maybe_reread_subscription(void);
                                291                 :                : 
                                292                 :                : extern void stream_cleanup_files(Oid subid, TransactionId xid);
                                293                 :                : 
                                294                 :                : extern void set_stream_options(WalRcvStreamOptions *options,
                                295                 :                :                                char *slotname,
                                296                 :                :                                XLogRecPtr *origin_startpos);
                                297                 :                : 
                                298                 :                : extern void start_apply(XLogRecPtr origin_startpos);
                                299                 :                : 
                                300                 :                : extern void InitializeLogRepWorker(void);
                                301                 :                : 
                                302                 :                : extern void SetupApplyOrSyncWorker(int worker_slot);
                                303                 :                : 
                                304                 :                : extern void DisableSubscriptionAndExit(void);
                                305                 :                : 
                                306                 :                : extern void store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn);
                                307                 :                : 
                                308                 :                : /* Function for apply error callback */
                                309                 :                : extern void apply_error_callback(void *arg);
                                310                 :                : extern void set_apply_error_context_origin(char *originname);
                                311                 :                : 
                                312                 :                : /* Parallel apply worker setup and interactions */
                                313                 :                : extern void pa_allocate_worker(TransactionId xid);
                                314                 :                : extern ParallelApplyWorkerInfo *pa_find_worker(TransactionId xid);
                                315                 :                : extern void pa_detach_all_error_mq(void);
                                316                 :                : 
                                317                 :                : extern bool pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes,
                                318                 :                :                          const void *data);
                                319                 :                : extern void pa_switch_to_partial_serialize(ParallelApplyWorkerInfo *winfo,
                                320                 :                :                                            bool stream_locked);
                                321                 :                : 
                                322                 :                : extern void pa_set_xact_state(ParallelApplyWorkerShared *wshared,
                                323                 :                :                               ParallelTransState xact_state);
                                324                 :                : extern void pa_set_stream_apply_worker(ParallelApplyWorkerInfo *winfo);
                                325                 :                : 
                                326                 :                : extern void pa_start_subtrans(TransactionId current_xid,
                                327                 :                :                               TransactionId top_xid);
                                328                 :                : extern void pa_reset_subtrans(void);
                                329                 :                : extern void pa_stream_abort(LogicalRepStreamAbortData *abort_data);
                                330                 :                : extern void pa_set_fileset_state(ParallelApplyWorkerShared *wshared,
                                331                 :                :                                  PartialFileSetState fileset_state);
                                332                 :                : 
                                333                 :                : extern void pa_lock_stream(TransactionId xid, LOCKMODE lockmode);
                                334                 :                : extern void pa_unlock_stream(TransactionId xid, LOCKMODE lockmode);
                                335                 :                : 
                                336                 :                : extern void pa_lock_transaction(TransactionId xid, LOCKMODE lockmode);
                                337                 :                : extern void pa_unlock_transaction(TransactionId xid, LOCKMODE lockmode);
                                338                 :                : 
                                339                 :                : extern void pa_decr_and_wait_stream_block(void);
                                340                 :                : 
                                341                 :                : extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo,
                                342                 :                :                            XLogRecPtr remote_lsn);
                                343                 :                : 
                                344                 :                : #define isParallelApplyWorker(worker) ((worker)->in_use && \
                                345                 :                :                                        (worker)->type == WORKERTYPE_PARALLEL_APPLY)
                                346                 :                : #define isTablesyncWorker(worker) ((worker)->in_use && \
                                347                 :                :                                    (worker)->type == WORKERTYPE_TABLESYNC)
                                348                 :                : 
                                349                 :                : static inline bool
 3089 peter_e@gmx.net           350                 :CBC        1280 : am_tablesync_worker(void)
                                351                 :                : {
  754 akapila@postgresql.o      352   [ +  -  +  + ]:           1280 :     return isTablesyncWorker(MyLogicalRepWorker);
                                353                 :                : }
                                354                 :                : 
                                355                 :                : static inline bool
  971                           356                 :         205207 : am_leader_apply_worker(void)
                                357                 :                : {
  743                           358         [ -  + ]:         205207 :     Assert(MyLogicalRepWorker->in_use);
  754                           359                 :         205207 :     return (MyLogicalRepWorker->type == WORKERTYPE_APPLY);
                                360                 :                : }
                                361                 :                : 
                                362                 :                : static inline bool
  971                           363                 :         327318 : am_parallel_apply_worker(void)
                                364                 :                : {
  743                           365         [ -  + ]:         327318 :     Assert(MyLogicalRepWorker->in_use);
  971                           366   [ +  -  +  + ]:         327318 :     return isParallelApplyWorker(MyLogicalRepWorker);
                                367                 :                : }
                                368                 :                : 
                                369                 :                : #endif                          /* WORKER_INTERNAL_H */
        

Generated by: LCOV version 2.4-beta