LCOV - differential code coverage report
Current view: top level - src/backend/replication/logical - applyparallelworker.c (source / functions) Coverage Total Hit UNC UBC GNC CBC DUB DCB
Current: c70b6db34ffeab48beef1fb4ce61bcad3772b8dd vs 06473f5a344df8c9594ead90a609b86f6724cff8 Lines: 89.4 % 425 380 1 44 2 378 1 2
Current Date: 2025-09-06 07:49:51 +0900 Functions: 100.0 % 36 36 2 34
Baseline: lcov-20250906-005545-baseline Branches: 60.9 % 243 148 1 94 1 147
Baseline Date: 2025-09-05 08:21:35 +0100 Line coverage date bins:
Legend: Lines:     hit not hit
Branches: + taken - not taken # not executed
(7,30] days: 50.0 % 2 1 1 1
(30,360] days: 100.0 % 4 4 1 3
(360..) days: 89.5 % 419 375 44 375
Function coverage date bins:
(30,360] days: 100.0 % 2 2 1 1
(360..) days: 100.0 % 34 34 1 33
Branch coverage date bins:
(30,360] days: 50.0 % 2 1 1 1
(360..) days: 61.0 % 241 147 94 147

 Age         Owner                    Branch data    TLA  Line data    Source code
                                  1                 :                : /*-------------------------------------------------------------------------
                                  2                 :                :  * applyparallelworker.c
                                  3                 :                :  *     Support routines for applying xact by parallel apply worker
                                  4                 :                :  *
                                  5                 :                :  * Copyright (c) 2023-2025, PostgreSQL Global Development Group
                                  6                 :                :  *
                                  7                 :                :  * IDENTIFICATION
                                  8                 :                :  *    src/backend/replication/logical/applyparallelworker.c
                                  9                 :                :  *
                                 10                 :                :  * This file contains the code to launch, set up, and teardown a parallel apply
                                 11                 :                :  * worker which receives the changes from the leader worker and invokes routines
                                 12                 :                :  * to apply those on the subscriber database. Additionally, this file contains
                                 13                 :                :  * routines that are intended to support setting up, using, and tearing down a
                                 14                 :                :  * ParallelApplyWorkerInfo which is required so the leader worker and parallel
                                 15                 :                :  * apply workers can communicate with each other.
                                 16                 :                :  *
                                 17                 :                :  * The parallel apply workers are assigned (if available) as soon as xact's
                                 18                 :                :  * first stream is received for subscriptions that have set their 'streaming'
                                 19                 :                :  * option as parallel. The leader apply worker will send changes to this new
                                 20                 :                :  * worker via shared memory. We keep this worker assigned till the transaction
                                 21                 :                :  * commit is received and also wait for the worker to finish at commit. This
                                 22                 :                :  * preserves commit ordering and avoid file I/O in most cases, although we
                                 23                 :                :  * still need to spill to a file if there is no worker available. See comments
                                 24                 :                :  * atop logical/worker to know more about streamed xacts whose changes are
                                 25                 :                :  * spilled to disk. It is important to maintain commit order to avoid failures
                                 26                 :                :  * due to: (a) transaction dependencies - say if we insert a row in the first
                                 27                 :                :  * transaction and update it in the second transaction on publisher then
                                 28                 :                :  * allowing the subscriber to apply both in parallel can lead to failure in the
                                 29                 :                :  * update; (b) deadlocks - allowing transactions that update the same set of
                                 30                 :                :  * rows/tables in the opposite order to be applied in parallel can lead to
                                 31                 :                :  * deadlocks.
                                 32                 :                :  *
                                 33                 :                :  * A worker pool is used to avoid restarting workers for each streaming
                                 34                 :                :  * transaction. We maintain each worker's information (ParallelApplyWorkerInfo)
                                 35                 :                :  * in the ParallelApplyWorkerPool. After successfully launching a new worker,
                                 36                 :                :  * its information is added to the ParallelApplyWorkerPool. Once the worker
                                 37                 :                :  * finishes applying the transaction, it is marked as available for re-use.
                                 38                 :                :  * Now, before starting a new worker to apply the streaming transaction, we
                                 39                 :                :  * check the list for any available worker. Note that we retain a maximum of
                                 40                 :                :  * half the max_parallel_apply_workers_per_subscription workers in the pool and
                                 41                 :                :  * after that, we simply exit the worker after applying the transaction.
                                 42                 :                :  *
                                 43                 :                :  * XXX This worker pool threshold is arbitrary and we can provide a GUC
                                 44                 :                :  * variable for this in the future if required.
                                 45                 :                :  *
                                 46                 :                :  * The leader apply worker will create a separate dynamic shared memory segment
                                 47                 :                :  * when each parallel apply worker starts. The reason for this design is that
                                 48                 :                :  * we cannot predict how many workers will be needed. It may be possible to
                                 49                 :                :  * allocate enough shared memory in one segment based on the maximum number of
                                 50                 :                :  * parallel apply workers (max_parallel_apply_workers_per_subscription), but
                                 51                 :                :  * this would waste memory if no process is actually started.
                                 52                 :                :  *
                                 53                 :                :  * The dynamic shared memory segment contains: (a) a shm_mq that is used to
                                 54                 :                :  * send changes in the transaction from leader apply worker to parallel apply
                                 55                 :                :  * worker; (b) another shm_mq that is used to send errors (and other messages
                                 56                 :                :  * reported via elog/ereport) from the parallel apply worker to leader apply
                                 57                 :                :  * worker; (c) necessary information to be shared among parallel apply workers
                                 58                 :                :  * and the leader apply worker (i.e. members of ParallelApplyWorkerShared).
                                 59                 :                :  *
                                 60                 :                :  * Locking Considerations
                                 61                 :                :  * ----------------------
                                 62                 :                :  * We have a risk of deadlock due to concurrently applying the transactions in
                                 63                 :                :  * parallel mode that were independent on the publisher side but became
                                 64                 :                :  * dependent on the subscriber side due to the different database structures
                                 65                 :                :  * (like schema of subscription tables, constraints, etc.) on each side. This
                                 66                 :                :  * can happen even without parallel mode when there are concurrent operations
                                 67                 :                :  * on the subscriber. In order to detect the deadlocks among leader (LA) and
                                 68                 :                :  * parallel apply (PA) workers, we used lmgr locks when the PA waits for the
                                 69                 :                :  * next stream (set of changes) and LA waits for PA to finish the transaction.
                                 70                 :                :  * An alternative approach could be to not allow parallelism when the schema of
                                 71                 :                :  * tables is different between the publisher and subscriber but that would be
                                 72                 :                :  * too restrictive and would require the publisher to send much more
                                 73                 :                :  * information than it is currently sending.
                                 74                 :                :  *
                                 75                 :                :  * Consider a case where the subscribed table does not have a unique key on the
                                 76                 :                :  * publisher and has a unique key on the subscriber. The deadlock can happen in
                                 77                 :                :  * the following ways:
                                 78                 :                :  *
                                 79                 :                :  * 1) Deadlock between the leader apply worker and a parallel apply worker
                                 80                 :                :  *
                                 81                 :                :  * Consider that the parallel apply worker (PA) is executing TX-1 and the
                                 82                 :                :  * leader apply worker (LA) is executing TX-2 concurrently on the subscriber.
                                 83                 :                :  * Now, LA is waiting for PA because of the unique key constraint of the
                                 84                 :                :  * subscribed table while PA is waiting for LA to send the next stream of
                                 85                 :                :  * changes or transaction finish command message.
                                 86                 :                :  *
                                 87                 :                :  * In order for lmgr to detect this, we have LA acquire a session lock on the
                                 88                 :                :  * remote transaction (by pa_lock_stream()) and have PA wait on the lock before
                                 89                 :                :  * trying to receive the next stream of changes. Specifically, LA will acquire
                                 90                 :                :  * the lock in AccessExclusive mode before sending the STREAM_STOP and will
                                 91                 :                :  * release it if already acquired after sending the STREAM_START, STREAM_ABORT
                                 92                 :                :  * (for toplevel transaction), STREAM_PREPARE, and STREAM_COMMIT. The PA will
                                 93                 :                :  * acquire the lock in AccessShare mode after processing STREAM_STOP and
                                 94                 :                :  * STREAM_ABORT (for subtransaction) and then release the lock immediately
                                 95                 :                :  * after acquiring it.
                                 96                 :                :  *
                                 97                 :                :  * The lock graph for the above example will look as follows:
                                 98                 :                :  * LA (waiting to acquire the lock on the unique index) -> PA (waiting to
                                 99                 :                :  * acquire the stream lock) -> LA
                                100                 :                :  *
                                101                 :                :  * This way, when PA is waiting for LA for the next stream of changes, we can
                                102                 :                :  * have a wait-edge from PA to LA in lmgr, which will make us detect the
                                103                 :                :  * deadlock between LA and PA.
                                104                 :                :  *
                                105                 :                :  * 2) Deadlock between the leader apply worker and parallel apply workers
                                106                 :                :  *
                                107                 :                :  * This scenario is similar to the first case but TX-1 and TX-2 are executed by
                                108                 :                :  * two parallel apply workers (PA-1 and PA-2 respectively). In this scenario,
                                109                 :                :  * PA-2 is waiting for PA-1 to complete its transaction while PA-1 is waiting
                                110                 :                :  * for subsequent input from LA. Also, LA is waiting for PA-2 to complete its
                                111                 :                :  * transaction in order to preserve the commit order. There is a deadlock among
                                112                 :                :  * the three processes.
                                113                 :                :  *
                                114                 :                :  * In order for lmgr to detect this, we have PA acquire a session lock (this is
                                115                 :                :  * a different lock than referred in the previous case, see
                                116                 :                :  * pa_lock_transaction()) on the transaction being applied and have LA wait on
                                117                 :                :  * the lock before proceeding in the transaction finish commands. Specifically,
                                118                 :                :  * PA will acquire this lock in AccessExclusive mode before executing the first
                                119                 :                :  * message of the transaction and release it at the xact end. LA will acquire
                                120                 :                :  * this lock in AccessShare mode at transaction finish commands and release it
                                121                 :                :  * immediately.
                                122                 :                :  *
                                123                 :                :  * The lock graph for the above example will look as follows:
                                124                 :                :  * LA (waiting to acquire the transaction lock) -> PA-2 (waiting to acquire the
                                125                 :                :  * lock due to unique index constraint) -> PA-1 (waiting to acquire the stream
                                126                 :                :  * lock) -> LA
                                127                 :                :  *
                                128                 :                :  * This way when LA is waiting to finish the transaction end command to preserve
                                129                 :                :  * the commit order, we will be able to detect deadlock, if any.
                                130                 :                :  *
                                131                 :                :  * One might think we can use XactLockTableWait(), but XactLockTableWait()
                                132                 :                :  * considers PREPARED TRANSACTION as still in progress which means the lock
                                133                 :                :  * won't be released even after the parallel apply worker has prepared the
                                134                 :                :  * transaction.
                                135                 :                :  *
                                136                 :                :  * 3) Deadlock when the shm_mq buffer is full
                                137                 :                :  *
                                138                 :                :  * In the previous scenario (ie. PA-1 and PA-2 are executing transactions
                                139                 :                :  * concurrently), if the shm_mq buffer between LA and PA-2 is full, LA has to
                                140                 :                :  * wait to send messages, and this wait doesn't appear in lmgr.
                                141                 :                :  *
                                142                 :                :  * To avoid this wait, we use a non-blocking write and wait with a timeout. If
                                143                 :                :  * the timeout is exceeded, the LA will serialize all the pending messages to
                                144                 :                :  * a file and indicate PA-2 that it needs to read that file for the remaining
                                145                 :                :  * messages. Then LA will start waiting for commit as in the previous case
                                146                 :                :  * which will detect deadlock if any. See pa_send_data() and
                                147                 :                :  * enum TransApplyAction.
                                148                 :                :  *
                                149                 :                :  * Lock types
                                150                 :                :  * ----------
                                151                 :                :  * Both the stream lock and the transaction lock mentioned above are
                                152                 :                :  * session-level locks because both locks could be acquired outside the
                                153                 :                :  * transaction, and the stream lock in the leader needs to persist across
                                154                 :                :  * transaction boundaries i.e. until the end of the streaming transaction.
                                155                 :                :  *-------------------------------------------------------------------------
                                156                 :                :  */
                                157                 :                : 
                                158                 :                : #include "postgres.h"
                                159                 :                : 
                                160                 :                : #include "libpq/pqformat.h"
                                161                 :                : #include "libpq/pqmq.h"
                                162                 :                : #include "pgstat.h"
                                163                 :                : #include "postmaster/interrupt.h"
                                164                 :                : #include "replication/logicallauncher.h"
                                165                 :                : #include "replication/logicalworker.h"
                                166                 :                : #include "replication/origin.h"
                                167                 :                : #include "replication/worker_internal.h"
                                168                 :                : #include "storage/ipc.h"
                                169                 :                : #include "storage/lmgr.h"
                                170                 :                : #include "tcop/tcopprot.h"
                                171                 :                : #include "utils/inval.h"
                                172                 :                : #include "utils/memutils.h"
                                173                 :                : #include "utils/syscache.h"
                                174                 :                : 
                                175                 :                : #define PG_LOGICAL_APPLY_SHM_MAGIC 0x787ca067
                                176                 :                : 
                                177                 :                : /*
                                178                 :                :  * DSM keys for parallel apply worker. Unlike other parallel execution code,
                                179                 :                :  * since we don't need to worry about DSM keys conflicting with plan_node_id we
                                180                 :                :  * can use small integers.
                                181                 :                :  */
                                182                 :                : #define PARALLEL_APPLY_KEY_SHARED       1
                                183                 :                : #define PARALLEL_APPLY_KEY_MQ           2
                                184                 :                : #define PARALLEL_APPLY_KEY_ERROR_QUEUE  3
                                185                 :                : 
                                186                 :                : /* Queue size of DSM, 16 MB for now. */
                                187                 :                : #define DSM_QUEUE_SIZE  (16 * 1024 * 1024)
                                188                 :                : 
                                189                 :                : /*
                                190                 :                :  * Error queue size of DSM. It is desirable to make it large enough that a
                                191                 :                :  * typical ErrorResponse can be sent without blocking. That way, a worker that
                                192                 :                :  * errors out can write the whole message into the queue and terminate without
                                193                 :                :  * waiting for the user backend.
                                194                 :                :  */
                                195                 :                : #define DSM_ERROR_QUEUE_SIZE            (16 * 1024)
                                196                 :                : 
                                197                 :                : /*
                                198                 :                :  * There are three fields in each message received by the parallel apply
                                199                 :                :  * worker: start_lsn, end_lsn and send_time. Because we have updated these
                                200                 :                :  * statistics in the leader apply worker, we can ignore these fields in the
                                201                 :                :  * parallel apply worker (see function LogicalRepApplyLoop).
                                202                 :                :  */
                                203                 :                : #define SIZE_STATS_MESSAGE (2 * sizeof(XLogRecPtr) + sizeof(TimestampTz))
                                204                 :                : 
                                205                 :                : /*
                                206                 :                :  * The type of session-level lock on a transaction being applied on a logical
                                207                 :                :  * replication subscriber.
                                208                 :                :  */
                                209                 :                : #define PARALLEL_APPLY_LOCK_STREAM  0
                                210                 :                : #define PARALLEL_APPLY_LOCK_XACT    1
                                211                 :                : 
                                212                 :                : /*
                                213                 :                :  * Hash table entry to map xid to the parallel apply worker state.
                                214                 :                :  */
                                215                 :                : typedef struct ParallelApplyWorkerEntry
                                216                 :                : {
                                217                 :                :     TransactionId xid;          /* Hash key -- must be first */
                                218                 :                :     ParallelApplyWorkerInfo *winfo;
                                219                 :                : } ParallelApplyWorkerEntry;
                                220                 :                : 
                                221                 :                : /*
                                222                 :                :  * A hash table used to cache the state of streaming transactions being applied
                                223                 :                :  * by the parallel apply workers.
                                224                 :                :  */
                                225                 :                : static HTAB *ParallelApplyTxnHash = NULL;
                                226                 :                : 
                                227                 :                : /*
                                228                 :                : * A list (pool) of active parallel apply workers. The information for
                                229                 :                : * the new worker is added to the list after successfully launching it. The
                                230                 :                : * list entry is removed if there are already enough workers in the worker
                                231                 :                : * pool at the end of the transaction. For more information about the worker
                                232                 :                : * pool, see comments atop this file.
                                233                 :                :  */
                                234                 :                : static List *ParallelApplyWorkerPool = NIL;
                                235                 :                : 
                                236                 :                : /*
                                237                 :                :  * Information shared between leader apply worker and parallel apply worker.
                                238                 :                :  */
                                239                 :                : ParallelApplyWorkerShared *MyParallelShared = NULL;
                                240                 :                : 
                                241                 :                : /*
                                242                 :                :  * Is there a message sent by a parallel apply worker that the leader apply
                                243                 :                :  * worker needs to receive?
                                244                 :                :  */
                                245                 :                : volatile sig_atomic_t ParallelApplyMessagePending = false;
                                246                 :                : 
                                247                 :                : /*
                                248                 :                :  * Cache the parallel apply worker information required for applying the
                                249                 :                :  * current streaming transaction. It is used to save the cost of searching the
                                250                 :                :  * hash table when applying the changes between STREAM_START and STREAM_STOP.
                                251                 :                :  */
                                252                 :                : static ParallelApplyWorkerInfo *stream_apply_worker = NULL;
                                253                 :                : 
                                254                 :                : /* A list to maintain subtransactions, if any. */
                                255                 :                : static List *subxactlist = NIL;
                                256                 :                : 
                                257                 :                : static void pa_free_worker_info(ParallelApplyWorkerInfo *winfo);
                                258                 :                : static ParallelTransState pa_get_xact_state(ParallelApplyWorkerShared *wshared);
                                259                 :                : static PartialFileSetState pa_get_fileset_state(void);
                                260                 :                : 
                                261                 :                : /*
                                262                 :                :  * Returns true if it is OK to start a parallel apply worker, false otherwise.
                                263                 :                :  */
                                264                 :                : static bool
  971 akapila@postgresql.o      265                 :CBC          82 : pa_can_start(void)
                                266                 :                : {
                                267                 :                :     /* Only leader apply workers can start parallel apply workers. */
                                268         [ +  + ]:             82 :     if (!am_leader_apply_worker())
                                269                 :             27 :         return false;
                                270                 :                : 
                                271                 :                :     /*
                                272                 :                :      * It is good to check for any change in the subscription parameter to
                                273                 :                :      * avoid the case where for a very long time the change doesn't get
                                274                 :                :      * reflected. This can happen when there is a constant flow of streaming
                                275                 :                :      * transactions that are handled by parallel apply workers.
                                276                 :                :      *
                                277                 :                :      * It is better to do it before the below checks so that the latest values
                                278                 :                :      * of subscription can be used for the checks.
                                279                 :                :      */
                                280                 :             55 :     maybe_reread_subscription();
                                281                 :                : 
                                282                 :                :     /*
                                283                 :                :      * Don't start a new parallel apply worker if the subscription is not
                                284                 :                :      * using parallel streaming mode, or if the publisher does not support
                                285                 :                :      * parallel apply.
                                286                 :                :      */
                                287         [ +  + ]:             55 :     if (!MyLogicalRepWorker->parallel_apply)
                                288                 :             28 :         return false;
                                289                 :                : 
                                290                 :                :     /*
                                291                 :                :      * Don't start a new parallel worker if user has set skiplsn as it's
                                292                 :                :      * possible that they want to skip the streaming transaction. For
                                293                 :                :      * streaming transactions, we need to serialize the transaction to a file
                                294                 :                :      * so that we can get the last LSN of the transaction to judge whether to
                                295                 :                :      * skip before starting to apply the change.
                                296                 :                :      *
                                297                 :                :      * One might think that we could allow parallelism if the first lsn of the
                                298                 :                :      * transaction is greater than skiplsn, but we don't send it with the
                                299                 :                :      * STREAM START message, and it doesn't seem worth sending the extra eight
                                300                 :                :      * bytes with the STREAM START to enable parallelism for this case.
                                301                 :                :      */
                                302         [ -  + ]:             27 :     if (!XLogRecPtrIsInvalid(MySubscription->skiplsn))
  971 akapila@postgresql.o      303                 :UBC           0 :         return false;
                                304                 :                : 
                                305                 :                :     /*
                                306                 :                :      * For streaming transactions that are being applied using a parallel
                                307                 :                :      * apply worker, we cannot decide whether to apply the change for a
                                308                 :                :      * relation that is not in the READY state (see
                                309                 :                :      * should_apply_changes_for_rel) as we won't know remote_final_lsn by that
                                310                 :                :      * time. So, we don't start the new parallel apply worker in this case.
                                311                 :                :      */
  971 akapila@postgresql.o      312         [ -  + ]:CBC          27 :     if (!AllTablesyncsReady())
  971 akapila@postgresql.o      313                 :UBC           0 :         return false;
                                314                 :                : 
  971 akapila@postgresql.o      315                 :CBC          27 :     return true;
                                316                 :                : }
                                317                 :                : 
                                318                 :                : /*
                                319                 :                :  * Set up a dynamic shared memory segment.
                                320                 :                :  *
                                321                 :                :  * We set up a control region that contains a fixed-size worker info
                                322                 :                :  * (ParallelApplyWorkerShared), a message queue, and an error queue.
                                323                 :                :  *
                                324                 :                :  * Returns true on success, false on failure.
                                325                 :                :  */
                                326                 :                : static bool
                                327                 :             10 : pa_setup_dsm(ParallelApplyWorkerInfo *winfo)
                                328                 :                : {
                                329                 :                :     shm_toc_estimator e;
                                330                 :                :     Size        segsize;
                                331                 :                :     dsm_segment *seg;
                                332                 :                :     shm_toc    *toc;
                                333                 :                :     ParallelApplyWorkerShared *shared;
                                334                 :                :     shm_mq     *mq;
                                335                 :             10 :     Size        queue_size = DSM_QUEUE_SIZE;
                                336                 :             10 :     Size        error_queue_size = DSM_ERROR_QUEUE_SIZE;
                                337                 :                : 
                                338                 :                :     /*
                                339                 :                :      * Estimate how much shared memory we need.
                                340                 :                :      *
                                341                 :                :      * Because the TOC machinery may choose to insert padding of oddly-sized
                                342                 :                :      * requests, we must estimate each chunk separately.
                                343                 :                :      *
                                344                 :                :      * We need one key to register the location of the header, and two other
                                345                 :                :      * keys to track the locations of the message queue and the error message
                                346                 :                :      * queue.
                                347                 :                :      */
                                348                 :             10 :     shm_toc_initialize_estimator(&e);
                                349                 :             10 :     shm_toc_estimate_chunk(&e, sizeof(ParallelApplyWorkerShared));
                                350                 :             10 :     shm_toc_estimate_chunk(&e, queue_size);
                                351                 :             10 :     shm_toc_estimate_chunk(&e, error_queue_size);
                                352                 :                : 
                                353                 :             10 :     shm_toc_estimate_keys(&e, 3);
                                354                 :             10 :     segsize = shm_toc_estimate(&e);
                                355                 :                : 
                                356                 :                :     /* Create the shared memory segment and establish a table of contents. */
                                357                 :             10 :     seg = dsm_create(shm_toc_estimate(&e), 0);
                                358         [ -  + ]:             10 :     if (!seg)
  971 akapila@postgresql.o      359                 :UBC           0 :         return false;
                                360                 :                : 
  971 akapila@postgresql.o      361                 :CBC          10 :     toc = shm_toc_create(PG_LOGICAL_APPLY_SHM_MAGIC, dsm_segment_address(seg),
                                362                 :                :                          segsize);
                                363                 :                : 
                                364                 :                :     /* Set up the header region. */
                                365                 :             10 :     shared = shm_toc_allocate(toc, sizeof(ParallelApplyWorkerShared));
                                366                 :             10 :     SpinLockInit(&shared->mutex);
                                367                 :                : 
                                368                 :             10 :     shared->xact_state = PARALLEL_TRANS_UNKNOWN;
                                369                 :             10 :     pg_atomic_init_u32(&(shared->pending_stream_count), 0);
                                370                 :             10 :     shared->last_commit_end = InvalidXLogRecPtr;
                                371                 :             10 :     shared->fileset_state = FS_EMPTY;
                                372                 :                : 
                                373                 :             10 :     shm_toc_insert(toc, PARALLEL_APPLY_KEY_SHARED, shared);
                                374                 :                : 
                                375                 :                :     /* Set up message queue for the worker. */
                                376                 :             10 :     mq = shm_mq_create(shm_toc_allocate(toc, queue_size), queue_size);
                                377                 :             10 :     shm_toc_insert(toc, PARALLEL_APPLY_KEY_MQ, mq);
                                378                 :             10 :     shm_mq_set_sender(mq, MyProc);
                                379                 :                : 
                                380                 :                :     /* Attach the queue. */
                                381                 :             10 :     winfo->mq_handle = shm_mq_attach(mq, seg, NULL);
                                382                 :                : 
                                383                 :                :     /* Set up error queue for the worker. */
                                384                 :             10 :     mq = shm_mq_create(shm_toc_allocate(toc, error_queue_size),
                                385                 :                :                        error_queue_size);
                                386                 :             10 :     shm_toc_insert(toc, PARALLEL_APPLY_KEY_ERROR_QUEUE, mq);
                                387                 :             10 :     shm_mq_set_receiver(mq, MyProc);
                                388                 :                : 
                                389                 :                :     /* Attach the queue. */
                                390                 :             10 :     winfo->error_mq_handle = shm_mq_attach(mq, seg, NULL);
                                391                 :                : 
                                392                 :                :     /* Return results to caller. */
                                393                 :             10 :     winfo->dsm_seg = seg;
                                394                 :             10 :     winfo->shared = shared;
                                395                 :                : 
                                396                 :             10 :     return true;
                                397                 :                : }
                                398                 :                : 
                                399                 :                : /*
                                400                 :                :  * Try to get a parallel apply worker from the pool. If none is available then
                                401                 :                :  * start a new one.
                                402                 :                :  */
                                403                 :                : static ParallelApplyWorkerInfo *
                                404                 :             27 : pa_launch_parallel_worker(void)
                                405                 :                : {
                                406                 :                :     MemoryContext oldcontext;
                                407                 :                :     bool        launched;
                                408                 :                :     ParallelApplyWorkerInfo *winfo;
                                409                 :                :     ListCell   *lc;
                                410                 :                : 
                                411                 :                :     /* Try to get an available parallel apply worker from the worker pool. */
                                412   [ +  +  +  +  :             29 :     foreach(lc, ParallelApplyWorkerPool)
                                              +  + ]
                                413                 :                :     {
                                414                 :             19 :         winfo = (ParallelApplyWorkerInfo *) lfirst(lc);
                                415                 :                : 
                                416         [ +  + ]:             19 :         if (!winfo->in_use)
                                417                 :             17 :             return winfo;
                                418                 :                :     }
                                419                 :                : 
                                420                 :                :     /*
                                421                 :                :      * Start a new parallel apply worker.
                                422                 :                :      *
                                423                 :                :      * The worker info can be used for the lifetime of the worker process, so
                                424                 :                :      * create it in a permanent context.
                                425                 :                :      */
                                426                 :             10 :     oldcontext = MemoryContextSwitchTo(ApplyContext);
                                427                 :                : 
                                428                 :             10 :     winfo = (ParallelApplyWorkerInfo *) palloc0(sizeof(ParallelApplyWorkerInfo));
                                429                 :                : 
                                430                 :                :     /* Setup shared memory. */
                                431         [ -  + ]:             10 :     if (!pa_setup_dsm(winfo))
                                432                 :                :     {
  971 akapila@postgresql.o      433                 :UBC           0 :         MemoryContextSwitchTo(oldcontext);
                                434                 :              0 :         pfree(winfo);
                                435                 :              0 :         return NULL;
                                436                 :                :     }
                                437                 :                : 
  754 akapila@postgresql.o      438                 :CBC          10 :     launched = logicalrep_worker_launch(WORKERTYPE_PARALLEL_APPLY,
                                439                 :             10 :                                         MyLogicalRepWorker->dbid,
  971                           440                 :             10 :                                         MySubscription->oid,
                                441                 :             10 :                                         MySubscription->name,
                                442                 :             10 :                                         MyLogicalRepWorker->userid,
                                443                 :                :                                         InvalidOid,
                                444                 :                :                                         dsm_segment_handle(winfo->dsm_seg),
                                445                 :                :                                         false);
                                446                 :                : 
                                447         [ +  - ]:             10 :     if (launched)
                                448                 :                :     {
                                449                 :             10 :         ParallelApplyWorkerPool = lappend(ParallelApplyWorkerPool, winfo);
                                450                 :                :     }
                                451                 :                :     else
                                452                 :                :     {
  971 akapila@postgresql.o      453                 :UBC           0 :         pa_free_worker_info(winfo);
                                454                 :              0 :         winfo = NULL;
                                455                 :                :     }
                                456                 :                : 
  971 akapila@postgresql.o      457                 :CBC          10 :     MemoryContextSwitchTo(oldcontext);
                                458                 :                : 
                                459                 :             10 :     return winfo;
                                460                 :                : }
                                461                 :                : 
                                462                 :                : /*
                                463                 :                :  * Allocate a parallel apply worker that will be used for the specified xid.
                                464                 :                :  *
                                465                 :                :  * We first try to get an available worker from the pool, if any and then try
                                466                 :                :  * to launch a new worker. On successful allocation, remember the worker
                                467                 :                :  * information in the hash table so that we can get it later for processing the
                                468                 :                :  * streaming changes.
                                469                 :                :  */
                                470                 :                : void
                                471                 :             82 : pa_allocate_worker(TransactionId xid)
                                472                 :                : {
                                473                 :                :     bool        found;
                                474                 :             82 :     ParallelApplyWorkerInfo *winfo = NULL;
                                475                 :                :     ParallelApplyWorkerEntry *entry;
                                476                 :                : 
                                477         [ +  + ]:             82 :     if (!pa_can_start())
                                478                 :             55 :         return;
                                479                 :                : 
  967                           480                 :             27 :     winfo = pa_launch_parallel_worker();
                                481         [ -  + ]:             27 :     if (!winfo)
  967 akapila@postgresql.o      482                 :UBC           0 :         return;
                                483                 :                : 
                                484                 :                :     /* First time through, initialize parallel apply worker state hashtable. */
  971 akapila@postgresql.o      485         [ +  + ]:CBC          27 :     if (!ParallelApplyTxnHash)
                                486                 :                :     {
                                487                 :                :         HASHCTL     ctl;
                                488                 :                : 
                                489   [ +  -  +  -  :             91 :         MemSet(&ctl, 0, sizeof(ctl));
                                     +  -  +  -  +  
                                                 + ]
                                490                 :              7 :         ctl.keysize = sizeof(TransactionId);
                                491                 :              7 :         ctl.entrysize = sizeof(ParallelApplyWorkerEntry);
                                492                 :              7 :         ctl.hcxt = ApplyContext;
                                493                 :                : 
                                494                 :              7 :         ParallelApplyTxnHash = hash_create("logical replication parallel apply workers hash",
                                495                 :                :                                            16, &ctl,
                                496                 :                :                                            HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
                                497                 :                :     }
                                498                 :                : 
                                499                 :                :     /* Create an entry for the requested transaction. */
                                500                 :             27 :     entry = hash_search(ParallelApplyTxnHash, &xid, HASH_ENTER, &found);
                                501         [ -  + ]:             27 :     if (found)
  971 akapila@postgresql.o      502         [ #  # ]:UBC           0 :         elog(ERROR, "hash table corrupted");
                                503                 :                : 
                                504                 :                :     /* Update the transaction information in shared memory. */
  971 akapila@postgresql.o      505         [ -  + ]:CBC          27 :     SpinLockAcquire(&winfo->shared->mutex);
                                506                 :             27 :     winfo->shared->xact_state = PARALLEL_TRANS_UNKNOWN;
                                507                 :             27 :     winfo->shared->xid = xid;
                                508                 :             27 :     SpinLockRelease(&winfo->shared->mutex);
                                509                 :                : 
                                510                 :             27 :     winfo->in_use = true;
                                511                 :             27 :     winfo->serialize_changes = false;
                                512                 :             27 :     entry->winfo = winfo;
                                513                 :                : }
                                514                 :                : 
                                515                 :                : /*
                                516                 :                :  * Find the assigned worker for the given transaction, if any.
                                517                 :                :  */
                                518                 :                : ParallelApplyWorkerInfo *
                                519                 :         257330 : pa_find_worker(TransactionId xid)
                                520                 :                : {
                                521                 :                :     bool        found;
                                522                 :                :     ParallelApplyWorkerEntry *entry;
                                523                 :                : 
                                524         [ +  + ]:         257330 :     if (!TransactionIdIsValid(xid))
                                525                 :          80139 :         return NULL;
                                526                 :                : 
                                527         [ +  + ]:         177191 :     if (!ParallelApplyTxnHash)
                                528                 :         103240 :         return NULL;
                                529                 :                : 
                                530                 :                :     /* Return the cached parallel apply worker if valid. */
                                531         [ +  + ]:          73951 :     if (stream_apply_worker)
                                532                 :          73654 :         return stream_apply_worker;
                                533                 :                : 
                                534                 :                :     /* Find an entry for the requested transaction. */
                                535                 :            297 :     entry = hash_search(ParallelApplyTxnHash, &xid, HASH_FIND, &found);
                                536         [ +  - ]:            297 :     if (found)
                                537                 :                :     {
                                538                 :                :         /* The worker must not have exited.  */
                                539         [ -  + ]:            297 :         Assert(entry->winfo->in_use);
                                540                 :            297 :         return entry->winfo;
                                541                 :                :     }
                                542                 :                : 
  971 akapila@postgresql.o      543                 :UBC           0 :     return NULL;
                                544                 :                : }
                                545                 :                : 
                                546                 :                : /*
                                547                 :                :  * Makes the worker available for reuse.
                                548                 :                :  *
                                549                 :                :  * This removes the parallel apply worker entry from the hash table so that it
                                550                 :                :  * can't be used. If there are enough workers in the pool, it stops the worker
                                551                 :                :  * and frees the corresponding info. Otherwise it just marks the worker as
                                552                 :                :  * available for reuse.
                                553                 :                :  *
                                554                 :                :  * For more information about the worker pool, see comments atop this file.
                                555                 :                :  */
                                556                 :                : static void
  971 akapila@postgresql.o      557                 :CBC          24 : pa_free_worker(ParallelApplyWorkerInfo *winfo)
                                558                 :                : {
                                559         [ -  + ]:             24 :     Assert(!am_parallel_apply_worker());
                                560         [ -  + ]:             24 :     Assert(winfo->in_use);
                                561         [ -  + ]:             24 :     Assert(pa_get_xact_state(winfo->shared) == PARALLEL_TRANS_FINISHED);
                                562                 :                : 
                                563         [ -  + ]:             24 :     if (!hash_search(ParallelApplyTxnHash, &winfo->shared->xid, HASH_REMOVE, NULL))
  971 akapila@postgresql.o      564         [ #  # ]:UBC           0 :         elog(ERROR, "hash table corrupted");
                                565                 :                : 
                                566                 :                :     /*
                                567                 :                :      * Stop the worker if there are enough workers in the pool.
                                568                 :                :      *
                                569                 :                :      * XXX Additionally, we also stop the worker if the leader apply worker
                                570                 :                :      * serialize part of the transaction data due to a send timeout. This is
                                571                 :                :      * because the message could be partially written to the queue and there
                                572                 :                :      * is no way to clean the queue other than resending the message until it
                                573                 :                :      * succeeds. Instead of trying to send the data which anyway would have
                                574                 :                :      * been serialized and then letting the parallel apply worker deal with
                                575                 :                :      * the spurious message, we stop the worker.
                                576                 :                :      */
  971 akapila@postgresql.o      577         [ +  + ]:CBC          24 :     if (winfo->serialize_changes ||
                                578                 :             20 :         list_length(ParallelApplyWorkerPool) >
                                579         [ +  + ]:             20 :         (max_parallel_apply_workers_per_subscription / 2))
                                580                 :                :     {
  851                           581                 :              5 :         logicalrep_pa_worker_stop(winfo);
  971                           582                 :              5 :         pa_free_worker_info(winfo);
                                583                 :                : 
                                584                 :              5 :         return;
                                585                 :                :     }
                                586                 :                : 
                                587                 :             19 :     winfo->in_use = false;
                                588                 :             19 :     winfo->serialize_changes = false;
                                589                 :                : }
                                590                 :                : 
                                591                 :                : /*
                                592                 :                :  * Free the parallel apply worker information and unlink the files with
                                593                 :                :  * serialized changes if any.
                                594                 :                :  */
                                595                 :                : static void
                                596                 :              5 : pa_free_worker_info(ParallelApplyWorkerInfo *winfo)
                                597                 :                : {
                                598         [ -  + ]:              5 :     Assert(winfo);
                                599                 :                : 
                                600         [ +  - ]:              5 :     if (winfo->mq_handle)
                                601                 :              5 :         shm_mq_detach(winfo->mq_handle);
                                602                 :                : 
                                603         [ -  + ]:              5 :     if (winfo->error_mq_handle)
  971 akapila@postgresql.o      604                 :UBC           0 :         shm_mq_detach(winfo->error_mq_handle);
                                605                 :                : 
                                606                 :                :     /* Unlink the files with serialized changes. */
  971 akapila@postgresql.o      607         [ +  + ]:CBC           5 :     if (winfo->serialize_changes)
                                608                 :              4 :         stream_cleanup_files(MyLogicalRepWorker->subid, winfo->shared->xid);
                                609                 :                : 
                                610         [ +  - ]:              5 :     if (winfo->dsm_seg)
                                611                 :              5 :         dsm_detach(winfo->dsm_seg);
                                612                 :                : 
                                613                 :                :     /* Remove from the worker pool. */
                                614                 :              5 :     ParallelApplyWorkerPool = list_delete_ptr(ParallelApplyWorkerPool, winfo);
                                615                 :                : 
                                616                 :              5 :     pfree(winfo);
                                617                 :              5 : }
                                618                 :                : 
                                619                 :                : /*
                                620                 :                :  * Detach the error queue for all parallel apply workers.
                                621                 :                :  */
                                622                 :                : void
                                623                 :            320 : pa_detach_all_error_mq(void)
                                624                 :                : {
                                625                 :                :     ListCell   *lc;
                                626                 :                : 
                                627   [ +  +  +  +  :            325 :     foreach(lc, ParallelApplyWorkerPool)
                                              +  + ]
                                628                 :                :     {
                                629                 :              5 :         ParallelApplyWorkerInfo *winfo = (ParallelApplyWorkerInfo *) lfirst(lc);
                                630                 :                : 
  851                           631         [ +  - ]:              5 :         if (winfo->error_mq_handle)
                                632                 :                :         {
                                633                 :              5 :             shm_mq_detach(winfo->error_mq_handle);
                                634                 :              5 :             winfo->error_mq_handle = NULL;
                                635                 :                :         }
                                636                 :                :     }
  971                           637                 :            320 : }
                                638                 :                : 
                                639                 :                : /*
                                640                 :                :  * Check if there are any pending spooled messages.
                                641                 :                :  */
                                642                 :                : static bool
                                643                 :             16 : pa_has_spooled_message_pending()
                                644                 :                : {
                                645                 :                :     PartialFileSetState fileset_state;
                                646                 :                : 
                                647                 :             16 :     fileset_state = pa_get_fileset_state();
                                648                 :                : 
                                649                 :             16 :     return (fileset_state != FS_EMPTY);
                                650                 :                : }
                                651                 :                : 
                                652                 :                : /*
                                653                 :                :  * Replay the spooled messages once the leader apply worker has finished
                                654                 :                :  * serializing changes to the file.
                                655                 :                :  *
                                656                 :                :  * Returns false if there aren't any pending spooled messages, true otherwise.
                                657                 :                :  */
                                658                 :                : static bool
                                659                 :             52 : pa_process_spooled_messages_if_required(void)
                                660                 :                : {
                                661                 :                :     PartialFileSetState fileset_state;
                                662                 :                : 
                                663                 :             52 :     fileset_state = pa_get_fileset_state();
                                664                 :                : 
                                665         [ +  + ]:             52 :     if (fileset_state == FS_EMPTY)
                                666                 :             44 :         return false;
                                667                 :                : 
                                668                 :                :     /*
                                669                 :                :      * If the leader apply worker is busy serializing the partial changes then
                                670                 :                :      * acquire the stream lock now and wait for the leader worker to finish
                                671                 :                :      * serializing the changes. Otherwise, the parallel apply worker won't get
                                672                 :                :      * a chance to receive a STREAM_STOP (and acquire the stream lock) until
                                673                 :                :      * the leader had serialized all changes which can lead to undetected
                                674                 :                :      * deadlock.
                                675                 :                :      *
                                676                 :                :      * Note that the fileset state can be FS_SERIALIZE_DONE once the leader
                                677                 :                :      * worker has finished serializing the changes.
                                678                 :                :      */
                                679         [ -  + ]:              8 :     if (fileset_state == FS_SERIALIZE_IN_PROGRESS)
                                680                 :                :     {
  971 akapila@postgresql.o      681                 :UBC           0 :         pa_lock_stream(MyParallelShared->xid, AccessShareLock);
                                682                 :              0 :         pa_unlock_stream(MyParallelShared->xid, AccessShareLock);
                                683                 :                : 
                                684                 :              0 :         fileset_state = pa_get_fileset_state();
                                685                 :                :     }
                                686                 :                : 
                                687                 :                :     /*
                                688                 :                :      * We cannot read the file immediately after the leader has serialized all
                                689                 :                :      * changes to the file because there may still be messages in the memory
                                690                 :                :      * queue. We will apply all spooled messages the next time we call this
                                691                 :                :      * function and that will ensure there are no messages left in the memory
                                692                 :                :      * queue.
                                693                 :                :      */
  971 akapila@postgresql.o      694         [ +  + ]:CBC           8 :     if (fileset_state == FS_SERIALIZE_DONE)
                                695                 :                :     {
                                696                 :              4 :         pa_set_fileset_state(MyParallelShared, FS_READY);
                                697                 :                :     }
                                698         [ +  - ]:              4 :     else if (fileset_state == FS_READY)
                                699                 :                :     {
                                700                 :              4 :         apply_spooled_messages(&MyParallelShared->fileset,
                                701                 :              4 :                                MyParallelShared->xid,
                                702                 :                :                                InvalidXLogRecPtr);
                                703                 :              4 :         pa_set_fileset_state(MyParallelShared, FS_EMPTY);
                                704                 :                :     }
                                705                 :                : 
                                706                 :              8 :     return true;
                                707                 :                : }
                                708                 :                : 
                                709                 :                : /*
                                710                 :                :  * Interrupt handler for main loop of parallel apply worker.
                                711                 :                :  */
                                712                 :                : static void
                                713                 :          64004 : ProcessParallelApplyInterrupts(void)
                                714                 :                : {
                                715         [ +  + ]:          64004 :     CHECK_FOR_INTERRUPTS();
                                716                 :                : 
                                717         [ +  + ]:          64001 :     if (ShutdownRequestPending)
                                718                 :                :     {
                                719         [ +  - ]:              5 :         ereport(LOG,
                                720                 :                :                 (errmsg("logical replication parallel apply worker for subscription \"%s\" has finished",
                                721                 :                :                         MySubscription->name)));
                                722                 :                : 
                                723                 :              5 :         proc_exit(0);
                                724                 :                :     }
                                725                 :                : 
                                726         [ +  + ]:          63996 :     if (ConfigReloadPending)
                                727                 :                :     {
                                728                 :              4 :         ConfigReloadPending = false;
                                729                 :              4 :         ProcessConfigFile(PGC_SIGHUP);
                                730                 :                :     }
                                731                 :          63996 : }
                                732                 :                : 
                                733                 :                : /* Parallel apply worker main loop. */
                                734                 :                : static void
                                735                 :             10 : LogicalParallelApplyLoop(shm_mq_handle *mqh)
                                736                 :                : {
                                737                 :                :     shm_mq_result shmq_res;
                                738                 :                :     ErrorContextCallback errcallback;
                                739                 :             10 :     MemoryContext oldcxt = CurrentMemoryContext;
                                740                 :                : 
                                741                 :                :     /*
                                742                 :                :      * Init the ApplyMessageContext which we clean up after each replication
                                743                 :                :      * protocol message.
                                744                 :                :      */
                                745                 :             10 :     ApplyMessageContext = AllocSetContextCreate(ApplyContext,
                                746                 :                :                                                 "ApplyMessageContext",
                                747                 :                :                                                 ALLOCSET_DEFAULT_SIZES);
                                748                 :                : 
                                749                 :                :     /*
                                750                 :                :      * Push apply error context callback. Fields will be filled while applying
                                751                 :                :      * a change.
                                752                 :                :      */
                                753                 :             10 :     errcallback.callback = apply_error_callback;
                                754                 :             10 :     errcallback.previous = error_context_stack;
                                755                 :             10 :     error_context_stack = &errcallback;
                                756                 :                : 
                                757                 :                :     for (;;)
                                758                 :          63994 :     {
                                759                 :                :         void       *data;
                                760                 :                :         Size        len;
                                761                 :                : 
                                762                 :          64004 :         ProcessParallelApplyInterrupts();
                                763                 :                : 
                                764                 :                :         /* Ensure we are reading the data into our memory context. */
                                765                 :          63996 :         MemoryContextSwitchTo(ApplyMessageContext);
                                766                 :                : 
                                767                 :          63996 :         shmq_res = shm_mq_receive(mqh, &len, &data, true);
                                768                 :                : 
                                769         [ +  + ]:          63996 :         if (shmq_res == SHM_MQ_SUCCESS)
                                770                 :                :         {
                                771                 :                :             StringInfoData s;
                                772                 :                :             int         c;
                                773                 :                : 
                                774         [ -  + ]:          63944 :             if (len == 0)
  971 akapila@postgresql.o      775         [ #  # ]:UBC           0 :                 elog(ERROR, "invalid message length");
                                776                 :                : 
  681 drowley@postgresql.o      777                 :CBC       63944 :             initReadOnlyStringInfo(&s, data, len);
                                778                 :                : 
                                779                 :                :             /*
                                780                 :                :              * The first byte of messages sent from leader apply worker to
                                781                 :                :              * parallel apply workers can only be PqReplMsg_WALData.
                                782                 :                :              */
  971 akapila@postgresql.o      783                 :          63944 :             c = pq_getmsgbyte(&s);
   31 nathan@postgresql.or      784         [ -  + ]:GNC       63944 :             if (c != PqReplMsg_WALData)
  971 akapila@postgresql.o      785         [ #  # ]:UBC           0 :                 elog(ERROR, "unexpected message \"%c\"", c);
                                786                 :                : 
                                787                 :                :             /*
                                788                 :                :              * Ignore statistics fields that have been updated by the leader
                                789                 :                :              * apply worker.
                                790                 :                :              *
                                791                 :                :              * XXX We can avoid sending the statistics fields from the leader
                                792                 :                :              * apply worker but for that, it needs to rebuild the entire
                                793                 :                :              * message by removing these fields which could be more work than
                                794                 :                :              * simply ignoring these fields in the parallel apply worker.
                                795                 :                :              */
  971 akapila@postgresql.o      796                 :CBC       63944 :             s.cursor += SIZE_STATS_MESSAGE;
                                797                 :                : 
                                798                 :          63944 :             apply_dispatch(&s);
                                799                 :                :         }
                                800         [ +  - ]:             52 :         else if (shmq_res == SHM_MQ_WOULD_BLOCK)
                                801                 :                :         {
                                802                 :                :             /* Replay the changes from the file, if any. */
                                803         [ +  + ]:             52 :             if (!pa_process_spooled_messages_if_required())
                                804                 :                :             {
                                805                 :                :                 int         rc;
                                806                 :                : 
                                807                 :                :                 /* Wait for more work. */
                                808                 :             44 :                 rc = WaitLatch(MyLatch,
                                809                 :                :                                WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
                                810                 :                :                                1000L,
                                811                 :                :                                WAIT_EVENT_LOGICAL_PARALLEL_APPLY_MAIN);
                                812                 :                : 
                                813         [ +  + ]:             44 :                 if (rc & WL_LATCH_SET)
                                814                 :             41 :                     ResetLatch(MyLatch);
                                815                 :                :             }
                                816                 :                :         }
                                817                 :                :         else
                                818                 :                :         {
  971 akapila@postgresql.o      819         [ #  # ]:UBC           0 :             Assert(shmq_res == SHM_MQ_DETACHED);
                                820                 :                : 
                                821         [ #  # ]:              0 :             ereport(ERROR,
                                822                 :                :                     (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                                823                 :                :                      errmsg("lost connection to the logical replication apply worker")));
                                824                 :                :         }
                                825                 :                : 
  971 akapila@postgresql.o      826                 :CBC       63994 :         MemoryContextReset(ApplyMessageContext);
                                827                 :          63994 :         MemoryContextSwitchTo(oldcxt);
                                828                 :                :     }
                                829                 :                : 
                                830                 :                :     /* Pop the error context stack. */
                                831                 :                :     error_context_stack = errcallback.previous;
                                832                 :                : 
                                833                 :                :     MemoryContextSwitchTo(oldcxt);
                                834                 :                : }
                                835                 :                : 
                                836                 :                : /*
                                837                 :                :  * Make sure the leader apply worker tries to read from our error queue one more
                                838                 :                :  * time. This guards against the case where we exit uncleanly without sending
                                839                 :                :  * an ErrorResponse, for example because some code calls proc_exit directly.
                                840                 :                :  *
                                841                 :                :  * Also explicitly detach from dsm segment to invoke on_dsm_detach callbacks,
                                842                 :                :  * if any. See ParallelWorkerShutdown for details.
                                843                 :                :  */
                                844                 :                : static void
                                845                 :             10 : pa_shutdown(int code, Datum arg)
                                846                 :                : {
  962                           847                 :             10 :     SendProcSignal(MyLogicalRepWorker->leader_pid,
                                848                 :                :                    PROCSIG_PARALLEL_APPLY_MESSAGE,
                                849                 :                :                    INVALID_PROC_NUMBER);
                                850                 :                : 
  971                           851                 :             10 :     dsm_detach((dsm_segment *) DatumGetPointer(arg));
                                852                 :             10 : }
                                853                 :                : 
                                854                 :                : /*
                                855                 :                :  * Parallel apply worker entry point.
                                856                 :                :  */
                                857                 :                : void
                                858                 :             10 : ParallelApplyWorkerMain(Datum main_arg)
                                859                 :                : {
                                860                 :                :     ParallelApplyWorkerShared *shared;
                                861                 :                :     dsm_handle  handle;
                                862                 :                :     dsm_segment *seg;
                                863                 :                :     shm_toc    *toc;
                                864                 :                :     shm_mq     *mq;
                                865                 :                :     shm_mq_handle *mqh;
                                866                 :                :     shm_mq_handle *error_mqh;
                                867                 :                :     RepOriginId originid;
                                868                 :             10 :     int         worker_slot = DatumGetInt32(main_arg);
                                869                 :                :     char        originname[NAMEDATALEN];
                                870                 :                : 
  857                           871                 :             10 :     InitializingApplyWorker = true;
                                872                 :                : 
                                873                 :                :     /* Setup signal handling. */
  971                           874                 :             10 :     pqsignal(SIGHUP, SignalHandlerForConfigReload);
                                875                 :             10 :     pqsignal(SIGINT, SignalHandlerForShutdownRequest);
                                876                 :             10 :     pqsignal(SIGTERM, die);
                                877                 :             10 :     BackgroundWorkerUnblockSignals();
                                878                 :                : 
                                879                 :                :     /*
                                880                 :                :      * Attach to the dynamic shared memory segment for the parallel apply, and
                                881                 :                :      * find its table of contents.
                                882                 :                :      *
                                883                 :                :      * Like parallel query, we don't need resource owner by this time. See
                                884                 :                :      * ParallelWorkerMain.
                                885                 :                :      */
                                886                 :             10 :     memcpy(&handle, MyBgworkerEntry->bgw_extra, sizeof(dsm_handle));
                                887                 :             10 :     seg = dsm_attach(handle);
                                888         [ -  + ]:             10 :     if (!seg)
  971 akapila@postgresql.o      889         [ #  # ]:UBC           0 :         ereport(ERROR,
                                890                 :                :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                                891                 :                :                  errmsg("could not map dynamic shared memory segment")));
                                892                 :                : 
  971 akapila@postgresql.o      893                 :CBC          10 :     toc = shm_toc_attach(PG_LOGICAL_APPLY_SHM_MAGIC, dsm_segment_address(seg));
                                894         [ -  + ]:             10 :     if (!toc)
  971 akapila@postgresql.o      895         [ #  # ]:UBC           0 :         ereport(ERROR,
                                896                 :                :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                                897                 :                :                  errmsg("invalid magic number in dynamic shared memory segment")));
                                898                 :                : 
                                899                 :                :     /* Look up the shared information. */
  971 akapila@postgresql.o      900                 :CBC          10 :     shared = shm_toc_lookup(toc, PARALLEL_APPLY_KEY_SHARED, false);
                                901                 :             10 :     MyParallelShared = shared;
                                902                 :                : 
                                903                 :                :     /*
                                904                 :                :      * Attach to the message queue.
                                905                 :                :      */
                                906                 :             10 :     mq = shm_toc_lookup(toc, PARALLEL_APPLY_KEY_MQ, false);
                                907                 :             10 :     shm_mq_set_receiver(mq, MyProc);
                                908                 :             10 :     mqh = shm_mq_attach(mq, seg, NULL);
                                909                 :                : 
                                910                 :                :     /*
                                911                 :                :      * Primary initialization is complete. Now, we can attach to our slot.
                                912                 :                :      * This is to ensure that the leader apply worker does not write data to
                                913                 :                :      * the uninitialized memory queue.
                                914                 :                :      */
                                915                 :             10 :     logicalrep_worker_attach(worker_slot);
                                916                 :                : 
                                917                 :                :     /*
                                918                 :                :      * Register the shutdown callback after we are attached to the worker
                                919                 :                :      * slot. This is to ensure that MyLogicalRepWorker remains valid when this
                                920                 :                :      * callback is invoked.
                                921                 :                :      */
  851                           922                 :             10 :     before_shmem_exit(pa_shutdown, PointerGetDatum(seg));
                                923                 :                : 
  971                           924         [ -  + ]:             10 :     SpinLockAcquire(&MyParallelShared->mutex);
                                925                 :             10 :     MyParallelShared->logicalrep_worker_generation = MyLogicalRepWorker->generation;
                                926                 :             10 :     MyParallelShared->logicalrep_worker_slot_no = worker_slot;
                                927                 :             10 :     SpinLockRelease(&MyParallelShared->mutex);
                                928                 :                : 
                                929                 :                :     /*
                                930                 :                :      * Attach to the error queue.
                                931                 :                :      */
                                932                 :             10 :     mq = shm_toc_lookup(toc, PARALLEL_APPLY_KEY_ERROR_QUEUE, false);
                                933                 :             10 :     shm_mq_set_sender(mq, MyProc);
                                934                 :             10 :     error_mqh = shm_mq_attach(mq, seg, NULL);
                                935                 :                : 
                                936                 :             10 :     pq_redirect_to_shm_mq(seg, error_mqh);
  962                           937                 :             10 :     pq_set_parallel_leader(MyLogicalRepWorker->leader_pid,
                                938                 :                :                            INVALID_PROC_NUMBER);
                                939                 :                : 
  971                           940                 :             10 :     MyLogicalRepWorker->last_send_time = MyLogicalRepWorker->last_recv_time =
                                941                 :             10 :         MyLogicalRepWorker->reply_time = 0;
                                942                 :                : 
  765                           943                 :             10 :     InitializeLogRepWorker();
                                944                 :                : 
  857                           945                 :             10 :     InitializingApplyWorker = false;
                                946                 :                : 
                                947                 :                :     /* Setup replication origin tracking. */
  971                           948                 :             10 :     StartTransactionCommand();
                                949                 :             10 :     ReplicationOriginNameForLogicalRep(MySubscription->oid, InvalidOid,
                                950                 :                :                                        originname, sizeof(originname));
                                951                 :             10 :     originid = replorigin_by_name(originname, false);
                                952                 :                : 
                                953                 :                :     /*
                                954                 :                :      * The parallel apply worker doesn't need to monopolize this replication
                                955                 :                :      * origin which was already acquired by its leader process.
                                956                 :                :      */
  962                           957                 :             10 :     replorigin_session_setup(originid, MyLogicalRepWorker->leader_pid);
  971                           958                 :             10 :     replorigin_session_origin = originid;
                                959                 :             10 :     CommitTransactionCommand();
                                960                 :                : 
                                961                 :                :     /*
                                962                 :                :      * Setup callback for syscache so that we know when something changes in
                                963                 :                :      * the subscription relation state.
                                964                 :                :      */
                                965                 :             10 :     CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP,
                                966                 :                :                                   invalidate_syncing_table_states,
                                967                 :                :                                   (Datum) 0);
                                968                 :                : 
                                969                 :             10 :     set_apply_error_context_origin(originname);
                                970                 :                : 
                                971                 :             10 :     LogicalParallelApplyLoop(mqh);
                                972                 :                : 
                                973                 :                :     /*
                                974                 :                :      * The parallel apply worker must not get here because the parallel apply
                                975                 :                :      * worker will only stop when it receives a SIGTERM or SIGINT from the
                                976                 :                :      * leader, or when there is an error. None of these cases will allow the
                                977                 :                :      * code to reach here.
                                978                 :                :      */
  971 akapila@postgresql.o      979                 :UBC           0 :     Assert(false);
                                980                 :                : }
                                981                 :                : 
                                982                 :                : /*
                                983                 :                :  * Handle receipt of an interrupt indicating a parallel apply worker message.
                                984                 :                :  *
                                985                 :                :  * Note: this is called within a signal handler! All we can do is set a flag
                                986                 :                :  * that will cause the next CHECK_FOR_INTERRUPTS() to invoke
                                987                 :                :  * ProcessParallelApplyMessages().
                                988                 :                :  */
                                989                 :                : void
  971 akapila@postgresql.o      990                 :CBC          13 : HandleParallelApplyMessageInterrupt(void)
                                991                 :                : {
                                992                 :             13 :     InterruptPending = true;
                                993                 :             13 :     ParallelApplyMessagePending = true;
                                994                 :             13 :     SetLatch(MyLatch);
                                995                 :             13 : }
                                996                 :                : 
                                997                 :                : /*
                                998                 :                :  * Process a single protocol message received from a single parallel apply
                                999                 :                :  * worker.
                               1000                 :                :  */
                               1001                 :                : static void
  185 heikki.linnakangas@i     1002                 :              2 : ProcessParallelApplyMessage(StringInfo msg)
                               1003                 :                : {
                               1004                 :                :     char        msgtype;
                               1005                 :                : 
  971 akapila@postgresql.o     1006                 :              2 :     msgtype = pq_getmsgbyte(msg);
                               1007                 :                : 
                               1008      [ +  -  - ]:              2 :     switch (msgtype)
                               1009                 :                :     {
   12 nathan@postgresql.or     1010                 :GNC           2 :         case PqMsg_ErrorResponse:
                               1011                 :                :             {
                               1012                 :                :                 ErrorData   edata;
                               1013                 :                : 
                               1014                 :                :                 /* Parse ErrorResponse. */
  971 akapila@postgresql.o     1015                 :CBC           2 :                 pq_parse_errornotice(msg, &edata);
                               1016                 :                : 
                               1017                 :                :                 /*
                               1018                 :                :                  * If desired, add a context line to show that this is a
                               1019                 :                :                  * message propagated from a parallel apply worker. Otherwise,
                               1020                 :                :                  * it can sometimes be confusing to understand what actually
                               1021                 :                :                  * happened.
                               1022                 :                :                  */
                               1023         [ +  - ]:              2 :                 if (edata.context)
                               1024                 :              2 :                     edata.context = psprintf("%s\n%s", edata.context,
                               1025                 :                :                                              _("logical replication parallel apply worker"));
                               1026                 :                :                 else
  971 akapila@postgresql.o     1027                 :UBC           0 :                     edata.context = pstrdup(_("logical replication parallel apply worker"));
                               1028                 :                : 
                               1029                 :                :                 /*
                               1030                 :                :                  * Context beyond that should use the error context callbacks
                               1031                 :                :                  * that were in effect in LogicalRepApplyLoop().
                               1032                 :                :                  */
  971 akapila@postgresql.o     1033                 :CBC           2 :                 error_context_stack = apply_error_context_stack;
                               1034                 :                : 
                               1035                 :                :                 /*
                               1036                 :                :                  * The actual error must have been reported by the parallel
                               1037                 :                :                  * apply worker.
                               1038                 :                :                  */
                               1039         [ +  - ]:              2 :                 ereport(ERROR,
                               1040                 :                :                         (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                               1041                 :                :                          errmsg("logical replication parallel apply worker exited due to error"),
                               1042                 :                :                          errcontext("%s", edata.context)));
                               1043                 :                :             }
                               1044                 :                : 
                               1045                 :                :             /*
                               1046                 :                :              * Don't need to do anything about NoticeResponse and
                               1047                 :                :              * NotificationResponse as the logical replication worker doesn't
                               1048                 :                :              * need to send messages to the client.
                               1049                 :                :              */
   12 nathan@postgresql.or     1050                 :UNC           0 :         case PqMsg_NoticeResponse:
                               1051                 :                :         case PqMsg_NotificationResponse:
  971 akapila@postgresql.o     1052                 :UBC           0 :             break;
                               1053                 :                : 
                               1054                 :              0 :         default:
                               1055         [ #  # ]:              0 :             elog(ERROR, "unrecognized message type received from logical replication parallel apply worker: %c (message length %d bytes)",
                               1056                 :                :                  msgtype, msg->len);
                               1057                 :                :     }
                               1058                 :              0 : }
                               1059                 :                : 
                               1060                 :                : /*
                               1061                 :                :  * Handle any queued protocol messages received from parallel apply workers.
                               1062                 :                :  */
                               1063                 :                : void
  185 heikki.linnakangas@i     1064                 :CBC           7 : ProcessParallelApplyMessages(void)
                               1065                 :                : {
                               1066                 :                :     ListCell   *lc;
                               1067                 :                :     MemoryContext oldcontext;
                               1068                 :                : 
                               1069                 :                :     static MemoryContext hpam_context = NULL;
                               1070                 :                : 
                               1071                 :                :     /*
                               1072                 :                :      * This is invoked from ProcessInterrupts(), and since some of the
                               1073                 :                :      * functions it calls contain CHECK_FOR_INTERRUPTS(), there is a potential
                               1074                 :                :      * for recursive calls if more signals are received while this runs. It's
                               1075                 :                :      * unclear that recursive entry would be safe, and it doesn't seem useful
                               1076                 :                :      * even if it is safe, so let's block interrupts until done.
                               1077                 :                :      */
  971 akapila@postgresql.o     1078                 :              7 :     HOLD_INTERRUPTS();
                               1079                 :                : 
                               1080                 :                :     /*
                               1081                 :                :      * Moreover, CurrentMemoryContext might be pointing almost anywhere. We
                               1082                 :                :      * don't want to risk leaking data into long-lived contexts, so let's do
                               1083                 :                :      * our work here in a private context that we can reset on each use.
                               1084                 :                :      */
                               1085         [ +  + ]:              7 :     if (!hpam_context)          /* first time through? */
                               1086                 :              6 :         hpam_context = AllocSetContextCreate(TopMemoryContext,
                               1087                 :                :                                              "ProcessParallelApplyMessages",
                               1088                 :                :                                              ALLOCSET_DEFAULT_SIZES);
                               1089                 :                :     else
                               1090                 :              1 :         MemoryContextReset(hpam_context);
                               1091                 :                : 
                               1092                 :              7 :     oldcontext = MemoryContextSwitchTo(hpam_context);
                               1093                 :                : 
                               1094                 :              7 :     ParallelApplyMessagePending = false;
                               1095                 :                : 
                               1096   [ +  -  +  +  :             13 :     foreach(lc, ParallelApplyWorkerPool)
                                              +  + ]
                               1097                 :                :     {
                               1098                 :                :         shm_mq_result res;
                               1099                 :                :         Size        nbytes;
                               1100                 :                :         void       *data;
                               1101                 :              8 :         ParallelApplyWorkerInfo *winfo = (ParallelApplyWorkerInfo *) lfirst(lc);
                               1102                 :                : 
                               1103                 :                :         /*
                               1104                 :                :          * The leader will detach from the error queue and set it to NULL
                               1105                 :                :          * before preparing to stop all parallel apply workers, so we don't
                               1106                 :                :          * need to handle error messages anymore. See
                               1107                 :                :          * logicalrep_worker_detach.
                               1108                 :                :          */
                               1109         [ +  + ]:              8 :         if (!winfo->error_mq_handle)
                               1110                 :              6 :             continue;
                               1111                 :                : 
                               1112                 :              3 :         res = shm_mq_receive(winfo->error_mq_handle, &nbytes, &data, true);
                               1113                 :                : 
                               1114         [ +  + ]:              3 :         if (res == SHM_MQ_WOULD_BLOCK)
                               1115                 :              1 :             continue;
                               1116         [ +  - ]:              2 :         else if (res == SHM_MQ_SUCCESS)
                               1117                 :                :         {
                               1118                 :                :             StringInfoData msg;
                               1119                 :                : 
                               1120                 :              2 :             initStringInfo(&msg);
                               1121                 :              2 :             appendBinaryStringInfo(&msg, data, nbytes);
  185 heikki.linnakangas@i     1122                 :              2 :             ProcessParallelApplyMessage(&msg);
  971 akapila@postgresql.o     1123                 :UBC           0 :             pfree(msg.data);
                               1124                 :                :         }
                               1125                 :                :         else
                               1126         [ #  # ]:              0 :             ereport(ERROR,
                               1127                 :                :                     (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                               1128                 :                :                      errmsg("lost connection to the logical replication parallel apply worker")));
                               1129                 :                :     }
                               1130                 :                : 
  971 akapila@postgresql.o     1131                 :CBC           5 :     MemoryContextSwitchTo(oldcontext);
                               1132                 :                : 
                               1133                 :                :     /* Might as well clear the context on our way out */
                               1134                 :              5 :     MemoryContextReset(hpam_context);
                               1135                 :                : 
                               1136         [ -  + ]:              5 :     RESUME_INTERRUPTS();
                               1137                 :              5 : }
                               1138                 :                : 
                               1139                 :                : /*
                               1140                 :                :  * Send the data to the specified parallel apply worker via shared-memory
                               1141                 :                :  * queue.
                               1142                 :                :  *
                               1143                 :                :  * Returns false if the attempt to send data via shared memory times out, true
                               1144                 :                :  * otherwise.
                               1145                 :                :  */
                               1146                 :                : bool
                               1147                 :          68914 : pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data)
                               1148                 :                : {
                               1149                 :                :     int         rc;
                               1150                 :                :     shm_mq_result result;
                               1151                 :          68914 :     TimestampTz startTime = 0;
                               1152                 :                : 
                               1153         [ -  + ]:          68914 :     Assert(!IsTransactionState());
                               1154         [ -  + ]:          68914 :     Assert(!winfo->serialize_changes);
                               1155                 :                : 
                               1156                 :                :     /*
                               1157                 :                :      * We don't try to send data to parallel worker for 'immediate' mode. This
                               1158                 :                :      * is primarily used for testing purposes.
                               1159                 :                :      */
  739 peter@eisentraut.org     1160         [ +  + ]:          68914 :     if (unlikely(debug_logical_replication_streaming == DEBUG_LOGICAL_REP_STREAMING_IMMEDIATE))
  947 akapila@postgresql.o     1161                 :              4 :         return false;
                               1162                 :                : 
                               1163                 :                : /*
                               1164                 :                :  * This timeout is a bit arbitrary but testing revealed that it is sufficient
                               1165                 :                :  * to send the message unless the parallel apply worker is waiting on some
                               1166                 :                :  * lock or there is a serious resource crunch. See the comments atop this file
                               1167                 :                :  * to know why we are using a non-blocking way to send the message.
                               1168                 :                :  */
                               1169                 :                : #define SHM_SEND_RETRY_INTERVAL_MS 1000
                               1170                 :                : #define SHM_SEND_TIMEOUT_MS     (10000 - SHM_SEND_RETRY_INTERVAL_MS)
                               1171                 :                : 
                               1172                 :                :     for (;;)
                               1173                 :                :     {
  971                          1174                 :          68910 :         result = shm_mq_send(winfo->mq_handle, nbytes, data, true, true);
                               1175                 :                : 
                               1176         [ +  - ]:          68910 :         if (result == SHM_MQ_SUCCESS)
                               1177                 :          68910 :             return true;
  971 akapila@postgresql.o     1178         [ #  # ]:UBC           0 :         else if (result == SHM_MQ_DETACHED)
                               1179         [ #  # ]:              0 :             ereport(ERROR,
                               1180                 :                :                     (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                               1181                 :                :                      errmsg("could not send data to shared-memory queue")));
                               1182                 :                : 
                               1183         [ #  # ]:              0 :         Assert(result == SHM_MQ_WOULD_BLOCK);
                               1184                 :                : 
                               1185                 :                :         /* Wait before retrying. */
                               1186                 :              0 :         rc = WaitLatch(MyLatch,
                               1187                 :                :                        WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
                               1188                 :                :                        SHM_SEND_RETRY_INTERVAL_MS,
                               1189                 :                :                        WAIT_EVENT_LOGICAL_APPLY_SEND_DATA);
                               1190                 :                : 
                               1191         [ #  # ]:              0 :         if (rc & WL_LATCH_SET)
                               1192                 :                :         {
                               1193                 :              0 :             ResetLatch(MyLatch);
                               1194         [ #  # ]:              0 :             CHECK_FOR_INTERRUPTS();
                               1195                 :                :         }
                               1196                 :                : 
                               1197         [ #  # ]:              0 :         if (startTime == 0)
                               1198                 :              0 :             startTime = GetCurrentTimestamp();
                               1199         [ #  # ]:              0 :         else if (TimestampDifferenceExceeds(startTime, GetCurrentTimestamp(),
                               1200                 :                :                                             SHM_SEND_TIMEOUT_MS))
                               1201                 :              0 :             return false;
                               1202                 :                :     }
                               1203                 :                : }
                               1204                 :                : 
                               1205                 :                : /*
                               1206                 :                :  * Switch to PARTIAL_SERIALIZE mode for the current transaction -- this means
                               1207                 :                :  * that the current data and any subsequent data for this transaction will be
                               1208                 :                :  * serialized to a file. This is done to prevent possible deadlocks with
                               1209                 :                :  * another parallel apply worker (refer to the comments atop this file).
                               1210                 :                :  */
                               1211                 :                : void
  971 akapila@postgresql.o     1212                 :CBC           4 : pa_switch_to_partial_serialize(ParallelApplyWorkerInfo *winfo,
                               1213                 :                :                                bool stream_locked)
                               1214                 :                : {
  947                          1215         [ +  - ]:              4 :     ereport(LOG,
                               1216                 :                :             (errmsg("logical replication apply worker will serialize the remaining changes of remote transaction %u to a file",
                               1217                 :                :                     winfo->shared->xid)));
                               1218                 :                : 
                               1219                 :                :     /*
                               1220                 :                :      * The parallel apply worker could be stuck for some reason (say waiting
                               1221                 :                :      * on some lock by other backend), so stop trying to send data directly to
                               1222                 :                :      * it and start serializing data to the file instead.
                               1223                 :                :      */
  971                          1224                 :              4 :     winfo->serialize_changes = true;
                               1225                 :                : 
                               1226                 :                :     /* Initialize the stream fileset. */
                               1227                 :              4 :     stream_start_internal(winfo->shared->xid, true);
                               1228                 :                : 
                               1229                 :                :     /*
                               1230                 :                :      * Acquires the stream lock if not already to make sure that the parallel
                               1231                 :                :      * apply worker will wait for the leader to release the stream lock until
                               1232                 :                :      * the end of the transaction.
                               1233                 :                :      */
                               1234         [ +  - ]:              4 :     if (!stream_locked)
                               1235                 :              4 :         pa_lock_stream(winfo->shared->xid, AccessExclusiveLock);
                               1236                 :                : 
                               1237                 :              4 :     pa_set_fileset_state(winfo->shared, FS_SERIALIZE_IN_PROGRESS);
                               1238                 :              4 : }
                               1239                 :                : 
                               1240                 :                : /*
                               1241                 :                :  * Wait until the parallel apply worker's transaction state has reached or
                               1242                 :                :  * exceeded the given xact_state.
                               1243                 :                :  */
                               1244                 :                : static void
                               1245                 :             25 : pa_wait_for_xact_state(ParallelApplyWorkerInfo *winfo,
                               1246                 :                :                        ParallelTransState xact_state)
                               1247                 :                : {
                               1248                 :                :     for (;;)
                               1249                 :                :     {
                               1250                 :                :         /*
                               1251                 :                :          * Stop if the transaction state has reached or exceeded the given
                               1252                 :                :          * xact_state.
                               1253                 :                :          */
                               1254         [ +  + ]:            286 :         if (pa_get_xact_state(winfo->shared) >= xact_state)
                               1255                 :             25 :             break;
                               1256                 :                : 
                               1257                 :                :         /* Wait to be signalled. */
                               1258                 :            261 :         (void) WaitLatch(MyLatch,
                               1259                 :                :                          WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
                               1260                 :                :                          10L,
                               1261                 :                :                          WAIT_EVENT_LOGICAL_PARALLEL_APPLY_STATE_CHANGE);
                               1262                 :                : 
                               1263                 :                :         /* Reset the latch so we don't spin. */
                               1264                 :            261 :         ResetLatch(MyLatch);
                               1265                 :                : 
                               1266                 :                :         /* An interrupt may have occurred while we were waiting. */
                               1267         [ -  + ]:            261 :         CHECK_FOR_INTERRUPTS();
                               1268                 :                :     }
                               1269                 :             25 : }
                               1270                 :                : 
                               1271                 :                : /*
                               1272                 :                :  * Wait until the parallel apply worker's transaction finishes.
                               1273                 :                :  */
                               1274                 :                : static void
                               1275                 :             25 : pa_wait_for_xact_finish(ParallelApplyWorkerInfo *winfo)
                               1276                 :                : {
                               1277                 :                :     /*
                               1278                 :                :      * Wait until the parallel apply worker set the state to
                               1279                 :                :      * PARALLEL_TRANS_STARTED which means it has acquired the transaction
                               1280                 :                :      * lock. This is to prevent leader apply worker from acquiring the
                               1281                 :                :      * transaction lock earlier than the parallel apply worker.
                               1282                 :                :      */
                               1283                 :             25 :     pa_wait_for_xact_state(winfo, PARALLEL_TRANS_STARTED);
                               1284                 :                : 
                               1285                 :                :     /*
                               1286                 :                :      * Wait for the transaction lock to be released. This is required to
                               1287                 :                :      * detect deadlock among leader and parallel apply workers. Refer to the
                               1288                 :                :      * comments atop this file.
                               1289                 :                :      */
                               1290                 :             25 :     pa_lock_transaction(winfo->shared->xid, AccessShareLock);
                               1291                 :             24 :     pa_unlock_transaction(winfo->shared->xid, AccessShareLock);
                               1292                 :                : 
                               1293                 :                :     /*
                               1294                 :                :      * Check if the state becomes PARALLEL_TRANS_FINISHED in case the parallel
                               1295                 :                :      * apply worker failed while applying changes causing the lock to be
                               1296                 :                :      * released.
                               1297                 :                :      */
                               1298         [ -  + ]:             24 :     if (pa_get_xact_state(winfo->shared) != PARALLEL_TRANS_FINISHED)
  971 akapila@postgresql.o     1299         [ #  # ]:UBC           0 :         ereport(ERROR,
                               1300                 :                :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                               1301                 :                :                  errmsg("lost connection to the logical replication parallel apply worker")));
  971 akapila@postgresql.o     1302                 :CBC          24 : }
                               1303                 :                : 
                               1304                 :                : /*
                               1305                 :                :  * Set the transaction state for a given parallel apply worker.
                               1306                 :                :  */
                               1307                 :                : void
                               1308                 :             51 : pa_set_xact_state(ParallelApplyWorkerShared *wshared,
                               1309                 :                :                   ParallelTransState xact_state)
                               1310                 :                : {
                               1311         [ -  + ]:             51 :     SpinLockAcquire(&wshared->mutex);
                               1312                 :             51 :     wshared->xact_state = xact_state;
                               1313                 :             51 :     SpinLockRelease(&wshared->mutex);
                               1314                 :             51 : }
                               1315                 :                : 
                               1316                 :                : /*
                               1317                 :                :  * Get the transaction state for a given parallel apply worker.
                               1318                 :                :  */
                               1319                 :                : static ParallelTransState
                               1320                 :            334 : pa_get_xact_state(ParallelApplyWorkerShared *wshared)
                               1321                 :                : {
                               1322                 :                :     ParallelTransState xact_state;
                               1323                 :                : 
                               1324         [ -  + ]:            334 :     SpinLockAcquire(&wshared->mutex);
                               1325                 :            334 :     xact_state = wshared->xact_state;
                               1326                 :            334 :     SpinLockRelease(&wshared->mutex);
                               1327                 :                : 
                               1328                 :            334 :     return xact_state;
                               1329                 :                : }
                               1330                 :                : 
                               1331                 :                : /*
                               1332                 :                :  * Cache the parallel apply worker information.
                               1333                 :                :  */
                               1334                 :                : void
                               1335                 :            524 : pa_set_stream_apply_worker(ParallelApplyWorkerInfo *winfo)
                               1336                 :                : {
                               1337                 :            524 :     stream_apply_worker = winfo;
                               1338                 :            524 : }
                               1339                 :                : 
                               1340                 :                : /*
                               1341                 :                :  * Form a unique savepoint name for the streaming transaction.
                               1342                 :                :  *
                               1343                 :                :  * Note that different subscriptions for publications on different nodes can
                               1344                 :                :  * receive same remote xid, so we need to use subscription id along with it.
                               1345                 :                :  *
                               1346                 :                :  * Returns the name in the supplied buffer.
                               1347                 :                :  */
                               1348                 :                : static void
                               1349                 :             27 : pa_savepoint_name(Oid suboid, TransactionId xid, char *spname, Size szsp)
                               1350                 :                : {
                               1351                 :             27 :     snprintf(spname, szsp, "pg_sp_%u_%u", suboid, xid);
                               1352                 :             27 : }
                               1353                 :                : 
                               1354                 :                : /*
                               1355                 :                :  * Define a savepoint for a subxact in parallel apply worker if needed.
                               1356                 :                :  *
                               1357                 :                :  * The parallel apply worker can figure out if a new subtransaction was
                               1358                 :                :  * started by checking if the new change arrived with a different xid. In that
                               1359                 :                :  * case define a named savepoint, so that we are able to rollback to it
                               1360                 :                :  * if required.
                               1361                 :                :  */
                               1362                 :                : void
                               1363                 :          68448 : pa_start_subtrans(TransactionId current_xid, TransactionId top_xid)
                               1364                 :                : {
                               1365         [ +  + ]:          68448 :     if (current_xid != top_xid &&
                               1366         [ +  + ]:             52 :         !list_member_xid(subxactlist, current_xid))
                               1367                 :                :     {
                               1368                 :                :         MemoryContext oldctx;
                               1369                 :                :         char        spname[NAMEDATALEN];
                               1370                 :                : 
                               1371                 :             17 :         pa_savepoint_name(MySubscription->oid, current_xid,
                               1372                 :                :                           spname, sizeof(spname));
                               1373                 :                : 
                               1374         [ +  + ]:             17 :         elog(DEBUG1, "defining savepoint %s in logical replication parallel apply worker", spname);
                               1375                 :                : 
                               1376                 :                :         /* We must be in transaction block to define the SAVEPOINT. */
                               1377         [ +  + ]:             17 :         if (!IsTransactionBlock())
                               1378                 :                :         {
                               1379         [ -  + ]:              5 :             if (!IsTransactionState())
  971 akapila@postgresql.o     1380                 :UBC           0 :                 StartTransactionCommand();
                               1381                 :                : 
  971 akapila@postgresql.o     1382                 :CBC           5 :             BeginTransactionBlock();
                               1383                 :              5 :             CommitTransactionCommand();
                               1384                 :                :         }
                               1385                 :                : 
                               1386                 :             17 :         DefineSavepoint(spname);
                               1387                 :                : 
                               1388                 :                :         /*
                               1389                 :                :          * CommitTransactionCommand is needed to start a subtransaction after
                               1390                 :                :          * issuing a SAVEPOINT inside a transaction block (see
                               1391                 :                :          * StartSubTransaction()).
                               1392                 :                :          */
                               1393                 :             17 :         CommitTransactionCommand();
                               1394                 :                : 
                               1395                 :             17 :         oldctx = MemoryContextSwitchTo(TopTransactionContext);
                               1396                 :             17 :         subxactlist = lappend_xid(subxactlist, current_xid);
                               1397                 :             17 :         MemoryContextSwitchTo(oldctx);
                               1398                 :                :     }
                               1399                 :          68448 : }
                               1400                 :                : 
                               1401                 :                : /* Reset the list that maintains subtransactions. */
                               1402                 :                : void
                               1403                 :             24 : pa_reset_subtrans(void)
                               1404                 :                : {
                               1405                 :                :     /*
                               1406                 :                :      * We don't need to free this explicitly as the allocated memory will be
                               1407                 :                :      * freed at the transaction end.
                               1408                 :                :      */
                               1409                 :             24 :     subxactlist = NIL;
                               1410                 :             24 : }
                               1411                 :                : 
                               1412                 :                : /*
                               1413                 :                :  * Handle STREAM ABORT message when the transaction was applied in a parallel
                               1414                 :                :  * apply worker.
                               1415                 :                :  */
                               1416                 :                : void
                               1417                 :             12 : pa_stream_abort(LogicalRepStreamAbortData *abort_data)
                               1418                 :                : {
                               1419                 :             12 :     TransactionId xid = abort_data->xid;
                               1420                 :             12 :     TransactionId subxid = abort_data->subxid;
                               1421                 :                : 
                               1422                 :                :     /*
                               1423                 :                :      * Update origin state so we can restart streaming from correct position
                               1424                 :                :      * in case of crash.
                               1425                 :                :      */
                               1426                 :             12 :     replorigin_session_origin_lsn = abort_data->abort_lsn;
                               1427                 :             12 :     replorigin_session_origin_timestamp = abort_data->abort_time;
                               1428                 :                : 
                               1429                 :                :     /*
                               1430                 :                :      * If the two XIDs are the same, it's in fact abort of toplevel xact, so
                               1431                 :                :      * just free the subxactlist.
                               1432                 :                :      */
                               1433         [ +  + ]:             12 :     if (subxid == xid)
                               1434                 :                :     {
                               1435                 :              2 :         pa_set_xact_state(MyParallelShared, PARALLEL_TRANS_FINISHED);
                               1436                 :                : 
                               1437                 :                :         /*
                               1438                 :                :          * Release the lock as we might be processing an empty streaming
                               1439                 :                :          * transaction in which case the lock won't be released during
                               1440                 :                :          * transaction rollback.
                               1441                 :                :          *
                               1442                 :                :          * Note that it's ok to release the transaction lock before aborting
                               1443                 :                :          * the transaction because even if the parallel apply worker dies due
                               1444                 :                :          * to crash or some other reason, such a transaction would still be
                               1445                 :                :          * considered aborted.
                               1446                 :                :          */
                               1447                 :              2 :         pa_unlock_transaction(xid, AccessExclusiveLock);
                               1448                 :                : 
                               1449                 :              2 :         AbortCurrentTransaction();
                               1450                 :                : 
                               1451         [ +  + ]:              2 :         if (IsTransactionBlock())
                               1452                 :                :         {
                               1453                 :              1 :             EndTransactionBlock(false);
                               1454                 :              1 :             CommitTransactionCommand();
                               1455                 :                :         }
                               1456                 :                : 
                               1457                 :              2 :         pa_reset_subtrans();
                               1458                 :                : 
                               1459                 :              2 :         pgstat_report_activity(STATE_IDLE, NULL);
                               1460                 :                :     }
                               1461                 :                :     else
                               1462                 :                :     {
                               1463                 :                :         /* OK, so it's a subxact. Rollback to the savepoint. */
                               1464                 :                :         int         i;
                               1465                 :                :         char        spname[NAMEDATALEN];
                               1466                 :                : 
                               1467                 :             10 :         pa_savepoint_name(MySubscription->oid, subxid, spname, sizeof(spname));
                               1468                 :                : 
                               1469         [ +  + ]:             10 :         elog(DEBUG1, "rolling back to savepoint %s in logical replication parallel apply worker", spname);
                               1470                 :                : 
                               1471                 :                :         /*
                               1472                 :                :          * Search the subxactlist, determine the offset tracked for the
                               1473                 :                :          * subxact, and truncate the list.
                               1474                 :                :          *
                               1475                 :                :          * Note that for an empty sub-transaction we won't find the subxid
                               1476                 :                :          * here.
                               1477                 :                :          */
                               1478         [ +  + ]:             12 :         for (i = list_length(subxactlist) - 1; i >= 0; i--)
                               1479                 :                :         {
                               1480                 :             11 :             TransactionId xid_tmp = lfirst_xid(list_nth_cell(subxactlist, i));
                               1481                 :                : 
                               1482         [ +  + ]:             11 :             if (xid_tmp == subxid)
                               1483                 :                :             {
                               1484                 :              9 :                 RollbackToSavepoint(spname);
                               1485                 :              9 :                 CommitTransactionCommand();
                               1486                 :              9 :                 subxactlist = list_truncate(subxactlist, i);
                               1487                 :              9 :                 break;
                               1488                 :                :             }
                               1489                 :                :         }
                               1490                 :                :     }
                               1491                 :             12 : }
                               1492                 :                : 
                               1493                 :                : /*
                               1494                 :                :  * Set the fileset state for a particular parallel apply worker. The fileset
                               1495                 :                :  * will be set once the leader worker serialized all changes to the file
                               1496                 :                :  * so that it can be used by parallel apply worker.
                               1497                 :                :  */
                               1498                 :                : void
                               1499                 :             16 : pa_set_fileset_state(ParallelApplyWorkerShared *wshared,
                               1500                 :                :                      PartialFileSetState fileset_state)
                               1501                 :                : {
                               1502         [ -  + ]:             16 :     SpinLockAcquire(&wshared->mutex);
                               1503                 :             16 :     wshared->fileset_state = fileset_state;
                               1504                 :                : 
                               1505         [ +  + ]:             16 :     if (fileset_state == FS_SERIALIZE_DONE)
                               1506                 :                :     {
                               1507         [ -  + ]:              4 :         Assert(am_leader_apply_worker());
                               1508         [ -  + ]:              4 :         Assert(MyLogicalRepWorker->stream_fileset);
                               1509                 :              4 :         wshared->fileset = *MyLogicalRepWorker->stream_fileset;
                               1510                 :                :     }
                               1511                 :                : 
                               1512                 :             16 :     SpinLockRelease(&wshared->mutex);
                               1513                 :             16 : }
                               1514                 :                : 
                               1515                 :                : /*
                               1516                 :                :  * Get the fileset state for the current parallel apply worker.
                               1517                 :                :  */
                               1518                 :                : static PartialFileSetState
                               1519                 :             68 : pa_get_fileset_state(void)
                               1520                 :                : {
                               1521                 :                :     PartialFileSetState fileset_state;
                               1522                 :                : 
                               1523         [ -  + ]:             68 :     Assert(am_parallel_apply_worker());
                               1524                 :                : 
                               1525         [ -  + ]:             68 :     SpinLockAcquire(&MyParallelShared->mutex);
                               1526                 :             68 :     fileset_state = MyParallelShared->fileset_state;
                               1527                 :             68 :     SpinLockRelease(&MyParallelShared->mutex);
                               1528                 :                : 
                               1529                 :             68 :     return fileset_state;
                               1530                 :                : }
                               1531                 :                : 
                               1532                 :                : /*
                               1533                 :                :  * Helper functions to acquire and release a lock for each stream block.
                               1534                 :                :  *
                               1535                 :                :  * Set locktag_field4 to PARALLEL_APPLY_LOCK_STREAM to indicate that it's a
                               1536                 :                :  * stream lock.
                               1537                 :                :  *
                               1538                 :                :  * Refer to the comments atop this file to see how the stream lock is used.
                               1539                 :                :  */
                               1540                 :                : void
                               1541                 :            284 : pa_lock_stream(TransactionId xid, LOCKMODE lockmode)
                               1542                 :                : {
                               1543                 :            284 :     LockApplyTransactionForSession(MyLogicalRepWorker->subid, xid,
                               1544                 :                :                                    PARALLEL_APPLY_LOCK_STREAM, lockmode);
                               1545                 :            282 : }
                               1546                 :                : 
                               1547                 :                : void
                               1548                 :            280 : pa_unlock_stream(TransactionId xid, LOCKMODE lockmode)
                               1549                 :                : {
                               1550                 :            280 :     UnlockApplyTransactionForSession(MyLogicalRepWorker->subid, xid,
                               1551                 :                :                                      PARALLEL_APPLY_LOCK_STREAM, lockmode);
                               1552                 :            280 : }
                               1553                 :                : 
                               1554                 :                : /*
                               1555                 :                :  * Helper functions to acquire and release a lock for each local transaction
                               1556                 :                :  * apply.
                               1557                 :                :  *
                               1558                 :                :  * Set locktag_field4 to PARALLEL_APPLY_LOCK_XACT to indicate that it's a
                               1559                 :                :  * transaction lock.
                               1560                 :                :  *
                               1561                 :                :  * Note that all the callers must pass a remote transaction ID instead of a
                               1562                 :                :  * local transaction ID as xid. This is because the local transaction ID will
                               1563                 :                :  * only be assigned while applying the first change in the parallel apply but
                               1564                 :                :  * it's possible that the first change in the parallel apply worker is blocked
                               1565                 :                :  * by a concurrently executing transaction in another parallel apply worker. We
                               1566                 :                :  * can only communicate the local transaction id to the leader after applying
                               1567                 :                :  * the first change so it won't be able to wait after sending the xact finish
                               1568                 :                :  * command using this lock.
                               1569                 :                :  *
                               1570                 :                :  * Refer to the comments atop this file to see how the transaction lock is
                               1571                 :                :  * used.
                               1572                 :                :  */
                               1573                 :                : void
                               1574                 :             52 : pa_lock_transaction(TransactionId xid, LOCKMODE lockmode)
                               1575                 :                : {
                               1576                 :             52 :     LockApplyTransactionForSession(MyLogicalRepWorker->subid, xid,
                               1577                 :                :                                    PARALLEL_APPLY_LOCK_XACT, lockmode);
                               1578                 :             51 : }
                               1579                 :                : 
                               1580                 :                : void
                               1581                 :             48 : pa_unlock_transaction(TransactionId xid, LOCKMODE lockmode)
                               1582                 :                : {
                               1583                 :             48 :     UnlockApplyTransactionForSession(MyLogicalRepWorker->subid, xid,
                               1584                 :                :                                      PARALLEL_APPLY_LOCK_XACT, lockmode);
                               1585                 :             48 : }
                               1586                 :                : 
                               1587                 :                : /*
                               1588                 :                :  * Decrement the number of pending streaming blocks and wait on the stream lock
                               1589                 :                :  * if there is no pending block available.
                               1590                 :                :  */
                               1591                 :                : void
                               1592                 :            261 : pa_decr_and_wait_stream_block(void)
                               1593                 :                : {
                               1594         [ -  + ]:            261 :     Assert(am_parallel_apply_worker());
                               1595                 :                : 
                               1596                 :                :     /*
                               1597                 :                :      * It is only possible to not have any pending stream chunks when we are
                               1598                 :                :      * applying spooled messages.
                               1599                 :                :      */
                               1600         [ +  + ]:            261 :     if (pg_atomic_read_u32(&MyParallelShared->pending_stream_count) == 0)
                               1601                 :                :     {
                               1602         [ +  - ]:             16 :         if (pa_has_spooled_message_pending())
                               1603                 :             16 :             return;
                               1604                 :                : 
  971 akapila@postgresql.o     1605         [ #  # ]:UBC           0 :         elog(ERROR, "invalid pending streaming chunk 0");
                               1606                 :                :     }
                               1607                 :                : 
  971 akapila@postgresql.o     1608         [ +  + ]:CBC         245 :     if (pg_atomic_sub_fetch_u32(&MyParallelShared->pending_stream_count, 1) == 0)
                               1609                 :                :     {
                               1610                 :             24 :         pa_lock_stream(MyParallelShared->xid, AccessShareLock);
                               1611                 :             22 :         pa_unlock_stream(MyParallelShared->xid, AccessShareLock);
                               1612                 :                :     }
                               1613                 :                : }
                               1614                 :                : 
                               1615                 :                : /*
                               1616                 :                :  * Finish processing the streaming transaction in the leader apply worker.
                               1617                 :                :  */
                               1618                 :                : void
                               1619                 :             25 : pa_xact_finish(ParallelApplyWorkerInfo *winfo, XLogRecPtr remote_lsn)
                               1620                 :                : {
                               1621         [ -  + ]:             25 :     Assert(am_leader_apply_worker());
                               1622                 :                : 
                               1623                 :                :     /*
                               1624                 :                :      * Unlock the shared object lock so that parallel apply worker can
                               1625                 :                :      * continue to receive and apply changes.
                               1626                 :                :      */
                               1627                 :             25 :     pa_unlock_stream(winfo->shared->xid, AccessExclusiveLock);
                               1628                 :                : 
                               1629                 :                :     /*
                               1630                 :                :      * Wait for that worker to finish. This is necessary to maintain commit
                               1631                 :                :      * order which avoids failures due to transaction dependencies and
                               1632                 :                :      * deadlocks.
                               1633                 :                :      */
                               1634                 :             25 :     pa_wait_for_xact_finish(winfo);
                               1635                 :                : 
                               1636         [ +  + ]:             24 :     if (!XLogRecPtrIsInvalid(remote_lsn))
                               1637                 :             22 :         store_flush_position(remote_lsn, winfo->shared->last_commit_end);
                               1638                 :                : 
                               1639                 :             24 :     pa_free_worker(winfo);
                               1640                 :             24 : }
        

Generated by: LCOV version 2.4-beta