LCOV - differential code coverage report
Current view: top level - src/backend/replication/pgoutput - pgoutput.c (source / functions) Coverage Total Hit UNC UBC GIC GNC CBC DUB DCB
Current: 0e5ff9b9b45a657aea12440478dc002e9b01f138 vs 0123ce131fca454009439dfa3b2266d1d40737d7 Lines: 95.9 % 802 769 33 27 742 17
Current Date: 2026-03-14 14:10:32 -0400 Functions: 100.0 % 42 42 13 29 3
Baseline: lcov-20260315-024220-baseline Branches: 73.4 % 530 389 1 140 2 13 374 5 11
Baseline Date: 2026-03-14 15:27:56 +0100 Line coverage date bins:
Legend: Lines:     hit not hit
Branches: + taken - not taken # not executed
(7,30] days: 100.0 % 16 16 16
(30,360] days: 100.0 % 21 21 11 10
(360..) days: 95.7 % 765 732 33 732
Function coverage date bins:
(7,30] days: 100.0 % 2 2 2
(30,360] days: 100.0 % 3 3 1 2
(360..) days: 100.0 % 37 37 10 27
Branch coverage date bins:
(7,30] days: 100.0 % 8 8 8
(30,360] days: 80.0 % 10 8 1 1 5 3
(360..) days: 72.9 % 512 373 139 2 371

 Age         Owner                    Branch data    TLA  Line data    Source code
                                  1                 :                : /*-------------------------------------------------------------------------
                                  2                 :                :  *
                                  3                 :                :  * pgoutput.c
                                  4                 :                :  *      Logical Replication output plugin
                                  5                 :                :  *
                                  6                 :                :  * Copyright (c) 2012-2026, PostgreSQL Global Development Group
                                  7                 :                :  *
                                  8                 :                :  * IDENTIFICATION
                                  9                 :                :  *        src/backend/replication/pgoutput/pgoutput.c
                                 10                 :                :  *
                                 11                 :                :  *-------------------------------------------------------------------------
                                 12                 :                :  */
                                 13                 :                : #include "postgres.h"
                                 14                 :                : 
                                 15                 :                : #include "access/tupconvert.h"
                                 16                 :                : #include "catalog/partition.h"
                                 17                 :                : #include "catalog/pg_publication.h"
                                 18                 :                : #include "catalog/pg_publication_rel.h"
                                 19                 :                : #include "catalog/pg_subscription.h"
                                 20                 :                : #include "commands/defrem.h"
                                 21                 :                : #include "commands/subscriptioncmds.h"
                                 22                 :                : #include "executor/executor.h"
                                 23                 :                : #include "fmgr.h"
                                 24                 :                : #include "nodes/makefuncs.h"
                                 25                 :                : #include "parser/parse_relation.h"
                                 26                 :                : #include "replication/logical.h"
                                 27                 :                : #include "replication/logicalproto.h"
                                 28                 :                : #include "replication/origin.h"
                                 29                 :                : #include "replication/pgoutput.h"
                                 30                 :                : #include "rewrite/rewriteHandler.h"
                                 31                 :                : #include "utils/builtins.h"
                                 32                 :                : #include "utils/inval.h"
                                 33                 :                : #include "utils/lsyscache.h"
                                 34                 :                : #include "utils/memutils.h"
                                 35                 :                : #include "utils/rel.h"
                                 36                 :                : #include "utils/syscache.h"
                                 37                 :                : #include "utils/varlena.h"
                                 38                 :                : 
  354 tgl@sss.pgh.pa.us          39                 :CBC         577 : PG_MODULE_MAGIC_EXT(
                                 40                 :                :                     .name = "pgoutput",
                                 41                 :                :                     .version = PG_VERSION
                                 42                 :                : );
                                 43                 :                : 
                                 44                 :                : static void pgoutput_startup(LogicalDecodingContext *ctx,
                                 45                 :                :                              OutputPluginOptions *opt, bool is_init);
                                 46                 :                : static void pgoutput_shutdown(LogicalDecodingContext *ctx);
                                 47                 :                : static void pgoutput_begin_txn(LogicalDecodingContext *ctx,
                                 48                 :                :                                ReorderBufferTXN *txn);
                                 49                 :                : static void pgoutput_commit_txn(LogicalDecodingContext *ctx,
                                 50                 :                :                                 ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
                                 51                 :                : static void pgoutput_change(LogicalDecodingContext *ctx,
                                 52                 :                :                             ReorderBufferTXN *txn, Relation relation,
                                 53                 :                :                             ReorderBufferChange *change);
                                 54                 :                : static void pgoutput_truncate(LogicalDecodingContext *ctx,
                                 55                 :                :                               ReorderBufferTXN *txn, int nrelations, Relation relations[],
                                 56                 :                :                               ReorderBufferChange *change);
                                 57                 :                : static void pgoutput_message(LogicalDecodingContext *ctx,
                                 58                 :                :                              ReorderBufferTXN *txn, XLogRecPtr message_lsn,
                                 59                 :                :                              bool transactional, const char *prefix,
                                 60                 :                :                              Size sz, const char *message);
                                 61                 :                : static bool pgoutput_origin_filter(LogicalDecodingContext *ctx,
                                 62                 :                :                                    ReplOriginId origin_id);
                                 63                 :                : static void pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx,
                                 64                 :                :                                        ReorderBufferTXN *txn);
                                 65                 :                : static void pgoutput_prepare_txn(LogicalDecodingContext *ctx,
                                 66                 :                :                                  ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
                                 67                 :                : static void pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx,
                                 68                 :                :                                          ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
                                 69                 :                : static void pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx,
                                 70                 :                :                                            ReorderBufferTXN *txn,
                                 71                 :                :                                            XLogRecPtr prepare_end_lsn,
                                 72                 :                :                                            TimestampTz prepare_time);
                                 73                 :                : static void pgoutput_stream_start(struct LogicalDecodingContext *ctx,
                                 74                 :                :                                   ReorderBufferTXN *txn);
                                 75                 :                : static void pgoutput_stream_stop(struct LogicalDecodingContext *ctx,
                                 76                 :                :                                  ReorderBufferTXN *txn);
                                 77                 :                : static void pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
                                 78                 :                :                                   ReorderBufferTXN *txn,
                                 79                 :                :                                   XLogRecPtr abort_lsn);
                                 80                 :                : static void pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
                                 81                 :                :                                    ReorderBufferTXN *txn,
                                 82                 :                :                                    XLogRecPtr commit_lsn);
                                 83                 :                : static void pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
                                 84                 :                :                                         ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
                                 85                 :                : 
                                 86                 :                : static bool publications_valid;
                                 87                 :                : 
                                 88                 :                : static List *LoadPublications(List *pubnames);
                                 89                 :                : static void publication_invalidation_cb(Datum arg, SysCacheIdentifier cacheid,
                                 90                 :                :                                         uint32 hashvalue);
                                 91                 :                : static void send_repl_origin(LogicalDecodingContext *ctx,
                                 92                 :                :                              ReplOriginId origin_id, XLogRecPtr origin_lsn,
                                 93                 :                :                              bool send_origin);
                                 94                 :                : 
                                 95                 :                : /*
                                 96                 :                :  * Only 3 publication actions are used for row filtering ("insert", "update",
                                 97                 :                :  * "delete"). See RelationSyncEntry.exprstate[].
                                 98                 :                :  */
                                 99                 :                : enum RowFilterPubAction
                                100                 :                : {
                                101                 :                :     PUBACTION_INSERT,
                                102                 :                :     PUBACTION_UPDATE,
                                103                 :                :     PUBACTION_DELETE,
                                104                 :                : };
                                105                 :                : 
                                106                 :                : #define NUM_ROWFILTER_PUBACTIONS (PUBACTION_DELETE+1)
                                107                 :                : 
                                108                 :                : /*
                                109                 :                :  * Entry in the map used to remember which relation schemas we sent.
                                110                 :                :  *
                                111                 :                :  * The schema_sent flag determines if the current schema record for the
                                112                 :                :  * relation (and for its ancestor if publish_as_relid is set) was already
                                113                 :                :  * sent to the subscriber (in which case we don't need to send it again).
                                114                 :                :  *
                                115                 :                :  * The schema cache on downstream is however updated only at commit time,
                                116                 :                :  * and with streamed transactions the commit order may be different from
                                117                 :                :  * the order the transactions are sent in. Also, the (sub) transactions
                                118                 :                :  * might get aborted so we need to send the schema for each (sub) transaction
                                119                 :                :  * so that we don't lose the schema information on abort. For handling this,
                                120                 :                :  * we maintain the list of xids (streamed_txns) for those we have already sent
                                121                 :                :  * the schema.
                                122                 :                :  *
                                123                 :                :  * For partitions, 'pubactions' considers not only the table's own
                                124                 :                :  * publications, but also those of all of its ancestors.
                                125                 :                :  */
                                126                 :                : typedef struct RelationSyncEntry
                                127                 :                : {
                                128                 :                :     Oid         relid;          /* relation oid */
                                129                 :                : 
                                130                 :                :     bool        replicate_valid;    /* overall validity flag for entry */
                                131                 :                : 
                                132                 :                :     bool        schema_sent;
                                133                 :                : 
                                134                 :                :     /*
                                135                 :                :      * This will be PUBLISH_GENCOLS_STORED if the relation contains generated
                                136                 :                :      * columns and the 'publish_generated_columns' parameter is set to
                                137                 :                :      * PUBLISH_GENCOLS_STORED. Otherwise, it will be PUBLISH_GENCOLS_NONE,
                                138                 :                :      * indicating that no generated columns should be published, unless
                                139                 :                :      * explicitly specified in the column list.
                                140                 :                :      */
                                141                 :                :     PublishGencolsType include_gencols_type;
                                142                 :                :     List       *streamed_txns;  /* streamed toplevel transactions with this
                                143                 :                :                                  * schema */
                                144                 :                : 
                                145                 :                :     /* are we publishing this rel? */
                                146                 :                :     PublicationActions pubactions;
                                147                 :                : 
                                148                 :                :     /*
                                149                 :                :      * ExprState array for row filter. Different publication actions don't
                                150                 :                :      * allow multiple expressions to always be combined into one, because
                                151                 :                :      * updates or deletes restrict the column in expression to be part of the
                                152                 :                :      * replica identity index whereas inserts do not have this restriction, so
                                153                 :                :      * there is one ExprState per publication action.
                                154                 :                :      */
                                155                 :                :     ExprState  *exprstate[NUM_ROWFILTER_PUBACTIONS];
                                156                 :                :     EState     *estate;         /* executor state used for row filter */
                                157                 :                :     TupleTableSlot *new_slot;   /* slot for storing new tuple */
                                158                 :                :     TupleTableSlot *old_slot;   /* slot for storing old tuple */
                                159                 :                : 
                                160                 :                :     /*
                                161                 :                :      * OID of the relation to publish changes as.  For a partition, this may
                                162                 :                :      * be set to one of its ancestors whose schema will be used when
                                163                 :                :      * replicating changes, if publish_via_partition_root is set for the
                                164                 :                :      * publication.
                                165                 :                :      */
                                166                 :                :     Oid         publish_as_relid;
                                167                 :                : 
                                168                 :                :     /*
                                169                 :                :      * Map used when replicating using an ancestor's schema to convert tuples
                                170                 :                :      * from partition's type to the ancestor's; NULL if publish_as_relid is
                                171                 :                :      * same as 'relid' or if unnecessary due to partition and the ancestor
                                172                 :                :      * having identical TupleDesc.
                                173                 :                :      */
                                174                 :                :     AttrMap    *attrmap;
                                175                 :                : 
                                176                 :                :     /*
                                177                 :                :      * Columns included in the publication, or NULL if all columns are
                                178                 :                :      * included implicitly.  Note that the attnums in this bitmap are not
                                179                 :                :      * shifted by FirstLowInvalidHeapAttributeNumber.
                                180                 :                :      */
                                181                 :                :     Bitmapset  *columns;
                                182                 :                : 
                                183                 :                :     /*
                                184                 :                :      * Private context to store additional data for this entry - state for the
                                185                 :                :      * row filter expressions, column list, etc.
                                186                 :                :      */
                                187                 :                :     MemoryContext entry_cxt;
                                188                 :                : } RelationSyncEntry;
                                189                 :                : 
                                190                 :                : /*
                                191                 :                :  * Maintain a per-transaction level variable to track whether the transaction
                                192                 :                :  * has sent BEGIN. BEGIN is only sent when the first change in a transaction
                                193                 :                :  * is processed. This makes it possible to skip sending a pair of BEGIN/COMMIT
                                194                 :                :  * messages for empty transactions which saves network bandwidth.
                                195                 :                :  *
                                196                 :                :  * This optimization is not used for prepared transactions because if the
                                197                 :                :  * WALSender restarts after prepare of a transaction and before commit prepared
                                198                 :                :  * of the same transaction then we won't be able to figure out if we have
                                199                 :                :  * skipped sending BEGIN/PREPARE of a transaction as it was empty. This is
                                200                 :                :  * because we would have lost the in-memory txndata information that was
                                201                 :                :  * present prior to the restart. This will result in sending a spurious
                                202                 :                :  * COMMIT PREPARED without a corresponding prepared transaction at the
                                203                 :                :  * downstream which would lead to an error when it tries to process it.
                                204                 :                :  *
                                205                 :                :  * XXX We could achieve this optimization by changing protocol to send
                                206                 :                :  * additional information so that downstream can detect that the corresponding
                                207                 :                :  * prepare has not been sent. However, adding such a check for every
                                208                 :                :  * transaction in the downstream could be costly so we might want to do it
                                209                 :                :  * optionally.
                                210                 :                :  *
                                211                 :                :  * We also don't have this optimization for streamed transactions because
                                212                 :                :  * they can contain prepared transactions.
                                213                 :                :  */
                                214                 :                : typedef struct PGOutputTxnData
                                215                 :                : {
                                216                 :                :     bool        sent_begin_txn; /* flag indicating whether BEGIN has been sent */
                                217                 :                : } PGOutputTxnData;
                                218                 :                : 
                                219                 :                : /* Map used to remember which relation schemas we sent. */
                                220                 :                : static HTAB *RelationSyncCache = NULL;
                                221                 :                : 
                                222                 :                : static void init_rel_sync_cache(MemoryContext cachectx);
                                223                 :                : static void cleanup_rel_sync_cache(TransactionId xid, bool is_commit);
                                224                 :                : static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data,
                                225                 :                :                                              Relation relation);
                                226                 :                : static void send_relation_and_attrs(Relation relation, TransactionId xid,
                                227                 :                :                                     LogicalDecodingContext *ctx,
                                228                 :                :                                     RelationSyncEntry *relentry);
                                229                 :                : static void rel_sync_cache_relation_cb(Datum arg, Oid relid);
                                230                 :                : static void rel_sync_cache_publication_cb(Datum arg, SysCacheIdentifier cacheid,
                                231                 :                :                                           uint32 hashvalue);
                                232                 :                : static void set_schema_sent_in_streamed_txn(RelationSyncEntry *entry,
                                233                 :                :                                             TransactionId xid);
                                234                 :                : static bool get_schema_sent_in_streamed_txn(RelationSyncEntry *entry,
                                235                 :                :                                             TransactionId xid);
                                236                 :                : static void init_tuple_slot(PGOutputData *data, Relation relation,
                                237                 :                :                             RelationSyncEntry *entry);
                                238                 :                : static void pgoutput_memory_context_reset(void *arg);
                                239                 :                : 
                                240                 :                : /* row filter routines */
                                241                 :                : static EState *create_estate_for_relation(Relation rel);
                                242                 :                : static void pgoutput_row_filter_init(PGOutputData *data,
                                243                 :                :                                      List *publications,
                                244                 :                :                                      RelationSyncEntry *entry);
                                245                 :                : static bool pgoutput_row_filter_exec_expr(ExprState *state,
                                246                 :                :                                           ExprContext *econtext);
                                247                 :                : static bool pgoutput_row_filter(Relation relation, TupleTableSlot *old_slot,
                                248                 :                :                                 TupleTableSlot **new_slot_ptr,
                                249                 :                :                                 RelationSyncEntry *entry,
                                250                 :                :                                 ReorderBufferChangeType *action);
                                251                 :                : 
                                252                 :                : /* column list routines */
                                253                 :                : static void pgoutput_column_list_init(PGOutputData *data,
                                254                 :                :                                       List *publications,
                                255                 :                :                                       RelationSyncEntry *entry);
                                256                 :                : 
                                257                 :                : /*
                                258                 :                :  * Specify output plugin callbacks
                                259                 :                :  */
                                260                 :                : void
 3342 peter_e@gmx.net           261                 :            775 : _PG_output_plugin_init(OutputPluginCallbacks *cb)
                                262                 :                : {
                                263                 :            775 :     cb->startup_cb = pgoutput_startup;
                                264                 :            775 :     cb->begin_cb = pgoutput_begin_txn;
                                265                 :            775 :     cb->change_cb = pgoutput_change;
 2899                           266                 :            775 :     cb->truncate_cb = pgoutput_truncate;
 1804 akapila@postgresql.o      267                 :            775 :     cb->message_cb = pgoutput_message;
 3342 peter_e@gmx.net           268                 :            775 :     cb->commit_cb = pgoutput_commit_txn;
                                269                 :                : 
 1705 akapila@postgresql.o      270                 :            775 :     cb->begin_prepare_cb = pgoutput_begin_prepare_txn;
                                271                 :            775 :     cb->prepare_cb = pgoutput_prepare_txn;
                                272                 :            775 :     cb->commit_prepared_cb = pgoutput_commit_prepared_txn;
                                273                 :            775 :     cb->rollback_prepared_cb = pgoutput_rollback_prepared_txn;
 3342 peter_e@gmx.net           274                 :            775 :     cb->filter_by_origin_cb = pgoutput_origin_filter;
                                275                 :            775 :     cb->shutdown_cb = pgoutput_shutdown;
                                276                 :                : 
                                277                 :                :     /* transaction streaming */
 2019 akapila@postgresql.o      278                 :            775 :     cb->stream_start_cb = pgoutput_stream_start;
                                279                 :            775 :     cb->stream_stop_cb = pgoutput_stream_stop;
                                280                 :            775 :     cb->stream_abort_cb = pgoutput_stream_abort;
                                281                 :            775 :     cb->stream_commit_cb = pgoutput_stream_commit;
                                282                 :            775 :     cb->stream_change_cb = pgoutput_change;
 1804                           283                 :            775 :     cb->stream_message_cb = pgoutput_message;
 2019                           284                 :            775 :     cb->stream_truncate_cb = pgoutput_truncate;
                                285                 :                :     /* transaction streaming - two-phase commit */
 1684                           286                 :            775 :     cb->stream_prepare_cb = pgoutput_stream_prepare_txn;
 3342 peter_e@gmx.net           287                 :            775 : }
                                288                 :                : 
                                289                 :                : static void
 1804 akapila@postgresql.o      290                 :            426 : parse_output_parameters(List *options, PGOutputData *data)
                                291                 :                : {
                                292                 :                :     ListCell   *lc;
 3342 peter_e@gmx.net           293                 :            426 :     bool        protocol_version_given = false;
                                294                 :            426 :     bool        publication_names_given = false;
 2066 tgl@sss.pgh.pa.us         295                 :            426 :     bool        binary_option_given = false;
 1804 akapila@postgresql.o      296                 :            426 :     bool        messages_option_given = false;
 2019                           297                 :            426 :     bool        streaming_given = false;
 1705                           298                 :            426 :     bool        two_phase_option_given = false;
 1333                           299                 :            426 :     bool        origin_option_given = false;
                                300                 :                : 
                                301                 :                :     /* Initialize optional parameters to defaults */
 1804                           302                 :            426 :     data->binary = false;
 1161                           303                 :            426 :     data->streaming = LOGICALREP_STREAM_OFF;
 1804                           304                 :            426 :     data->messages = false;
 1705                           305                 :            426 :     data->two_phase = false;
  242 fujii@postgresql.org      306                 :GNC         426 :     data->publish_no_origin = false;
                                307                 :                : 
 3342 peter_e@gmx.net           308   [ +  -  +  +  :CBC        2118 :     foreach(lc, options)
                                              +  + ]
                                309                 :                :     {
                                310                 :           1692 :         DefElem    *defel = (DefElem *) lfirst(lc);
                                311                 :                : 
                                312   [ +  -  -  + ]:           1692 :         Assert(defel->arg == NULL || IsA(defel->arg, String));
                                313                 :                : 
                                314                 :                :         /* Check each param, whether or not we recognize it */
                                315         [ +  + ]:           1692 :         if (strcmp(defel->defname, "proto_version") == 0)
                                316                 :                :         {
                                317                 :                :             unsigned long parsed;
                                318                 :                :             char       *endptr;
                                319                 :                : 
                                320         [ -  + ]:            426 :             if (protocol_version_given)
 3342 peter_e@gmx.net           321         [ #  # ]:UBC           0 :                 ereport(ERROR,
                                322                 :                :                         (errcode(ERRCODE_SYNTAX_ERROR),
                                323                 :                :                          errmsg("conflicting or redundant options")));
 3342 peter_e@gmx.net           324                 :CBC         426 :             protocol_version_given = true;
                                325                 :                : 
 1490 peter@eisentraut.org      326                 :            426 :             errno = 0;
                                327                 :            426 :             parsed = strtoul(strVal(defel->arg), &endptr, 10);
                                328   [ +  -  -  + ]:            426 :             if (errno != 0 || *endptr != '\0')
 3342 peter_e@gmx.net           329         [ #  # ]:UBC           0 :                 ereport(ERROR,
                                330                 :                :                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                                331                 :                :                          errmsg("invalid proto_version")));
                                332                 :                : 
 1490 peter@eisentraut.org      333         [ -  + ]:CBC         426 :             if (parsed > PG_UINT32_MAX)
 3342 peter_e@gmx.net           334         [ #  # ]:UBC           0 :                 ereport(ERROR,
                                335                 :                :                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                                336                 :                :                          errmsg("proto_version \"%s\" out of range",
                                337                 :                :                                 strVal(defel->arg))));
                                338                 :                : 
 1804 akapila@postgresql.o      339                 :CBC         426 :             data->protocol_version = (uint32) parsed;
                                340                 :                :         }
 3342 peter_e@gmx.net           341         [ +  + ]:           1266 :         else if (strcmp(defel->defname, "publication_names") == 0)
                                342                 :                :         {
                                343         [ -  + ]:            426 :             if (publication_names_given)
 3342 peter_e@gmx.net           344         [ #  # ]:UBC           0 :                 ereport(ERROR,
                                345                 :                :                         (errcode(ERRCODE_SYNTAX_ERROR),
                                346                 :                :                          errmsg("conflicting or redundant options")));
 3342 peter_e@gmx.net           347                 :CBC         426 :             publication_names_given = true;
                                348                 :                : 
                                349                 :                :             /*
                                350                 :                :              * Pass a copy of the DefElem->arg since SplitIdentifierString
                                351                 :                :              * modifies its input.
                                352                 :                :              */
   68 drowley@postgresql.o      353         [ -  + ]:            426 :             if (!SplitIdentifierString(pstrdup(strVal(defel->arg)), ',',
                                354                 :                :                                        &data->publication_names))
 3224 bruce@momjian.us          355         [ #  # ]:UBC           0 :                 ereport(ERROR,
                                356                 :                :                         (errcode(ERRCODE_INVALID_NAME),
                                357                 :                :                          errmsg("invalid publication_names syntax")));
                                358                 :                :         }
 2066 tgl@sss.pgh.pa.us         359         [ +  + ]:CBC         840 :         else if (strcmp(defel->defname, "binary") == 0)
                                360                 :                :         {
                                361         [ -  + ]:             10 :             if (binary_option_given)
 2066 tgl@sss.pgh.pa.us         362         [ #  # ]:UBC           0 :                 ereport(ERROR,
                                363                 :                :                         (errcode(ERRCODE_SYNTAX_ERROR),
                                364                 :                :                          errmsg("conflicting or redundant options")));
 2066 tgl@sss.pgh.pa.us         365                 :CBC          10 :             binary_option_given = true;
                                366                 :                : 
 1804 akapila@postgresql.o      367                 :             10 :             data->binary = defGetBoolean(defel);
                                368                 :                :         }
                                369         [ +  + ]:            830 :         else if (strcmp(defel->defname, "messages") == 0)
                                370                 :                :         {
                                371         [ -  + ]:              4 :             if (messages_option_given)
 1804 akapila@postgresql.o      372         [ #  # ]:UBC           0 :                 ereport(ERROR,
                                373                 :                :                         (errcode(ERRCODE_SYNTAX_ERROR),
                                374                 :                :                          errmsg("conflicting or redundant options")));
 1804 akapila@postgresql.o      375                 :CBC           4 :             messages_option_given = true;
                                376                 :                : 
                                377                 :              4 :             data->messages = defGetBoolean(defel);
                                378                 :                :         }
 2019                           379         [ +  + ]:            826 :         else if (strcmp(defel->defname, "streaming") == 0)
                                380                 :                :         {
                                381         [ -  + ]:            405 :             if (streaming_given)
 2019 akapila@postgresql.o      382         [ #  # ]:UBC           0 :                 ereport(ERROR,
                                383                 :                :                         (errcode(ERRCODE_SYNTAX_ERROR),
                                384                 :                :                          errmsg("conflicting or redundant options")));
 2019 akapila@postgresql.o      385                 :CBC         405 :             streaming_given = true;
                                386                 :                : 
 1161                           387                 :            405 :             data->streaming = defGetStreamingMode(defel);
                                388                 :                :         }
 1705                           389         [ +  + ]:            421 :         else if (strcmp(defel->defname, "two_phase") == 0)
                                390                 :                :         {
                                391         [ -  + ]:              7 :             if (two_phase_option_given)
 1705 akapila@postgresql.o      392         [ #  # ]:UBC           0 :                 ereport(ERROR,
                                393                 :                :                         (errcode(ERRCODE_SYNTAX_ERROR),
                                394                 :                :                          errmsg("conflicting or redundant options")));
 1705 akapila@postgresql.o      395                 :CBC           7 :             two_phase_option_given = true;
                                396                 :                : 
                                397                 :              7 :             data->two_phase = defGetBoolean(defel);
                                398                 :                :         }
 1333                           399         [ +  - ]:            414 :         else if (strcmp(defel->defname, "origin") == 0)
                                400                 :                :         {
                                401                 :                :             char       *origin;
                                402                 :                : 
                                403         [ -  + ]:            414 :             if (origin_option_given)
 1333 akapila@postgresql.o      404         [ #  # ]:UBC           0 :                 ereport(ERROR,
                                405                 :                :                         errcode(ERRCODE_SYNTAX_ERROR),
                                406                 :                :                         errmsg("conflicting or redundant options"));
 1333 akapila@postgresql.o      407                 :CBC         414 :             origin_option_given = true;
                                408                 :                : 
  900                           409                 :            414 :             origin = defGetString(defel);
                                410         [ +  + ]:            414 :             if (pg_strcasecmp(origin, LOGICALREP_ORIGIN_NONE) == 0)
                                411                 :             27 :                 data->publish_no_origin = true;
                                412         [ +  - ]:            387 :             else if (pg_strcasecmp(origin, LOGICALREP_ORIGIN_ANY) == 0)
                                413                 :            387 :                 data->publish_no_origin = false;
                                414                 :                :             else
 1333 akapila@postgresql.o      415         [ #  # ]:UBC           0 :                 ereport(ERROR,
                                416                 :                :                         errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                                417                 :                :                         errmsg("unrecognized origin value: \"%s\"", origin));
                                418                 :                :         }
                                419                 :                :         else
 3342 peter_e@gmx.net           420         [ #  # ]:              0 :             elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
                                421                 :                :     }
                                422                 :                : 
                                423                 :                :     /* Check required options */
  817 akapila@postgresql.o      424         [ -  + ]:CBC         426 :     if (!protocol_version_given)
  817 akapila@postgresql.o      425         [ #  # ]:UBC           0 :         ereport(ERROR,
                                426                 :                :                 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                                427                 :                :                 errmsg("option \"%s\" missing", "proto_version"));
  817 akapila@postgresql.o      428         [ -  + ]:CBC         426 :     if (!publication_names_given)
  817 akapila@postgresql.o      429         [ #  # ]:UBC           0 :         ereport(ERROR,
                                430                 :                :                 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                                431                 :                :                 errmsg("option \"%s\" missing", "publication_names"));
 3342 peter_e@gmx.net           432                 :CBC         426 : }
                                433                 :                : 
                                434                 :                : /*
                                435                 :                :  * Memory context reset callback of PGOutputData->context.
                                436                 :                :  */
                                437                 :                : static void
  157 msawada@postgresql.o      438                 :           1108 : pgoutput_memory_context_reset(void *arg)
                                439                 :                : {
                                440         [ +  + ]:           1108 :     if (RelationSyncCache)
                                441                 :                :     {
                                442                 :            205 :         hash_destroy(RelationSyncCache);
                                443                 :            205 :         RelationSyncCache = NULL;
                                444                 :                :     }
                                445                 :           1108 : }
                                446                 :                : 
                                447                 :                : /*
                                448                 :                :  * Initialize this plugin
                                449                 :                :  */
                                450                 :                : static void
 3224 bruce@momjian.us          451                 :            775 : pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
                                452                 :                :                  bool is_init)
                                453                 :                : {
   95 michael@paquier.xyz       454                 :GNC         775 :     PGOutputData *data = palloc0_object(PGOutputData);
                                455                 :                :     static bool publication_callback_registered = false;
                                456                 :                :     MemoryContextCallback *mcallback;
                                457                 :                : 
                                458                 :                :     /* Create our memory context for private allocations. */
 3342 peter_e@gmx.net           459                 :CBC         775 :     data->context = AllocSetContextCreate(ctx->context,
                                460                 :                :                                           "logical replication output context",
                                461                 :                :                                           ALLOCSET_DEFAULT_SIZES);
                                462                 :                : 
 1482 akapila@postgresql.o      463                 :            775 :     data->cachectx = AllocSetContextCreate(ctx->context,
                                464                 :                :                                            "logical replication cache context",
                                465                 :                :                                            ALLOCSET_DEFAULT_SIZES);
                                466                 :                : 
  461 michael@paquier.xyz       467                 :            775 :     data->pubctx = AllocSetContextCreate(ctx->context,
                                468                 :                :                                          "logical replication publication list context",
                                469                 :                :                                          ALLOCSET_SMALL_SIZES);
                                470                 :                : 
                                471                 :                :     /*
                                472                 :                :      * Ensure to cleanup RelationSyncCache even when logical decoding invoked
                                473                 :                :      * via SQL interface ends up with an error.
                                474                 :                :      */
   95 michael@paquier.xyz       475                 :GNC         775 :     mcallback = palloc0_object(MemoryContextCallback);
  157 msawada@postgresql.o      476                 :CBC         775 :     mcallback->func = pgoutput_memory_context_reset;
                                477                 :            775 :     MemoryContextRegisterResetCallback(ctx->context, mcallback);
                                478                 :                : 
 3342 peter_e@gmx.net           479                 :            775 :     ctx->output_plugin_private = data;
                                480                 :                : 
                                481                 :                :     /* This plugin uses binary protocol. */
                                482                 :            775 :     opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
                                483                 :                : 
                                484                 :                :     /*
                                485                 :                :      * This is replication start and not slot initialization.
                                486                 :                :      *
                                487                 :                :      * Parse and validate options passed by the client.
                                488                 :                :      */
                                489         [ +  + ]:            775 :     if (!is_init)
                                490                 :                :     {
                                491                 :                :         /* Parse the params and ERROR if we see any we don't recognize */
 1804 akapila@postgresql.o      492                 :            426 :         parse_output_parameters(ctx->output_plugin_options, data);
                                493                 :                : 
                                494                 :                :         /* Check if we support requested protocol */
 1996                           495         [ -  + ]:            426 :         if (data->protocol_version > LOGICALREP_PROTO_MAX_VERSION_NUM)
 3342 peter_e@gmx.net           496         [ #  # ]:UBC           0 :             ereport(ERROR,
                                497                 :                :                     (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                                498                 :                :                      errmsg("client sent proto_version=%d but server only supports protocol %d or lower",
                                499                 :                :                             data->protocol_version, LOGICALREP_PROTO_MAX_VERSION_NUM)));
                                500                 :                : 
 3342 peter_e@gmx.net           501         [ -  + ]:CBC         426 :         if (data->protocol_version < LOGICALREP_PROTO_MIN_VERSION_NUM)
 3342 peter_e@gmx.net           502         [ #  # ]:UBC           0 :             ereport(ERROR,
                                503                 :                :                     (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                                504                 :                :                      errmsg("client sent proto_version=%d but server only supports protocol %d or higher",
                                505                 :                :                             data->protocol_version, LOGICALREP_PROTO_MIN_VERSION_NUM)));
                                506                 :                : 
                                507                 :                :         /*
                                508                 :                :          * Decide whether to enable streaming. It is disabled by default, in
                                509                 :                :          * which case we just update the flag in decoding context. Otherwise
                                510                 :                :          * we only allow it with sufficient version of the protocol, and when
                                511                 :                :          * the output plugin supports it.
                                512                 :                :          */
 1161 akapila@postgresql.o      513         [ +  + ]:CBC         426 :         if (data->streaming == LOGICALREP_STREAM_OFF)
 2019                           514                 :             21 :             ctx->streaming = false;
 1161                           515         [ +  + ]:            405 :         else if (data->streaming == LOGICALREP_STREAM_ON &&
                                516         [ -  + ]:             26 :                  data->protocol_version < LOGICALREP_PROTO_STREAM_VERSION_NUM)
 2019 akapila@postgresql.o      517         [ #  # ]:UBC           0 :             ereport(ERROR,
                                518                 :                :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                                519                 :                :                      errmsg("requested proto_version=%d does not support streaming, need %d or higher",
                                520                 :                :                             data->protocol_version, LOGICALREP_PROTO_STREAM_VERSION_NUM)));
 1161 akapila@postgresql.o      521         [ +  + ]:CBC         405 :         else if (data->streaming == LOGICALREP_STREAM_PARALLEL &&
                                522         [ -  + ]:            379 :                  data->protocol_version < LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM)
 1161 akapila@postgresql.o      523         [ #  # ]:UBC           0 :             ereport(ERROR,
                                524                 :                :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                                525                 :                :                      errmsg("requested proto_version=%d does not support parallel streaming, need %d or higher",
                                526                 :                :                             data->protocol_version, LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM)));
 2019 akapila@postgresql.o      527         [ -  + ]:CBC         405 :         else if (!ctx->streaming)
 2019 akapila@postgresql.o      528         [ #  # ]:UBC           0 :             ereport(ERROR,
                                529                 :                :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                                530                 :                :                      errmsg("streaming requested, but not supported by output plugin")));
                                531                 :                : 
                                532                 :                :         /*
                                533                 :                :          * Here, we just check whether the two-phase option is passed by
                                534                 :                :          * plugin and decide whether to enable it at later point of time. It
                                535                 :                :          * remains enabled if the previous start-up has done so. But we only
                                536                 :                :          * allow the option to be passed in with sufficient version of the
                                537                 :                :          * protocol, and when the output plugin supports it.
                                538                 :                :          */
 1705 akapila@postgresql.o      539         [ +  + ]:CBC         426 :         if (!data->two_phase)
                                540                 :            419 :             ctx->twophase_opt_given = false;
                                541         [ -  + ]:              7 :         else if (data->protocol_version < LOGICALREP_PROTO_TWOPHASE_VERSION_NUM)
 1705 akapila@postgresql.o      542         [ #  # ]:UBC           0 :             ereport(ERROR,
                                543                 :                :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                                544                 :                :                      errmsg("requested proto_version=%d does not support two-phase commit, need %d or higher",
                                545                 :                :                             data->protocol_version, LOGICALREP_PROTO_TWOPHASE_VERSION_NUM)));
 1705 akapila@postgresql.o      546         [ -  + ]:CBC           7 :         else if (!ctx->twophase)
 1705 akapila@postgresql.o      547         [ #  # ]:UBC           0 :             ereport(ERROR,
                                548                 :                :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                                549                 :                :                      errmsg("two-phase commit requested, but not supported by output plugin")));
                                550                 :                :         else
 1705 akapila@postgresql.o      551                 :CBC           7 :             ctx->twophase_opt_given = true;
                                552                 :                : 
                                553                 :                :         /* Init publication state. */
 3342 peter_e@gmx.net           554                 :            426 :         data->publications = NIL;
                                555                 :            426 :         publications_valid = false;
                                556                 :                : 
                                557                 :                :         /*
                                558                 :                :          * Register callback for pg_publication if we didn't already do that
                                559                 :                :          * during some previous call in this process.
                                560                 :                :          */
 1116 tgl@sss.pgh.pa.us         561         [ +  + ]:            426 :         if (!publication_callback_registered)
                                562                 :                :         {
                                563                 :            424 :             CacheRegisterSyscacheCallback(PUBLICATIONOID,
                                564                 :                :                                           publication_invalidation_cb,
                                565                 :                :                                           (Datum) 0);
  367 akapila@postgresql.o      566                 :            424 :             CacheRegisterRelSyncCallback(rel_sync_cache_relation_cb,
                                567                 :                :                                          (Datum) 0);
 1116 tgl@sss.pgh.pa.us         568                 :            424 :             publication_callback_registered = true;
                                569                 :                :         }
                                570                 :                : 
                                571                 :                :         /* Initialize relation schema cache. */
 3342 peter_e@gmx.net           572                 :            426 :         init_rel_sync_cache(CacheMemoryContext);
                                573                 :                :     }
                                574                 :                :     else
                                575                 :                :     {
                                576                 :                :         /*
                                577                 :                :          * Disable the streaming and prepared transactions during the slot
                                578                 :                :          * initialization mode.
                                579                 :                :          */
 2019 akapila@postgresql.o      580                 :            349 :         ctx->streaming = false;
 1705                           581                 :            349 :         ctx->twophase = false;
                                582                 :                :     }
 3342 peter_e@gmx.net           583                 :            775 : }
                                584                 :                : 
                                585                 :                : /*
                                586                 :                :  * BEGIN callback.
                                587                 :                :  *
                                588                 :                :  * Don't send the BEGIN message here instead postpone it until the first
                                589                 :                :  * change. In logical replication, a common scenario is to replicate a set of
                                590                 :                :  * tables (instead of all tables) and transactions whose changes were on
                                591                 :                :  * the table(s) that are not published will produce empty transactions. These
                                592                 :                :  * empty transactions will send BEGIN and COMMIT messages to subscribers,
                                593                 :                :  * using bandwidth on something with little/no use for logical replication.
                                594                 :                :  */
                                595                 :                : static void
 1403 tgl@sss.pgh.pa.us         596                 :           1096 : pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
                                597                 :                : {
                                598                 :           1096 :     PGOutputTxnData *txndata = MemoryContextAllocZero(ctx->context,
                                599                 :                :                                                       sizeof(PGOutputTxnData));
                                600                 :                : 
 1446 akapila@postgresql.o      601                 :           1096 :     txn->output_plugin_private = txndata;
                                602                 :           1096 : }
                                603                 :                : 
                                604                 :                : /*
                                605                 :                :  * Send BEGIN.
                                606                 :                :  *
                                607                 :                :  * This is called while processing the first change of the transaction.
                                608                 :                :  */
                                609                 :                : static void
                                610                 :            466 : pgoutput_send_begin(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
                                611                 :                : {
   46 msawada@postgresql.o      612                 :GNC         466 :     bool        send_replication_origin = txn->origin_id != InvalidReplOriginId;
 1446 akapila@postgresql.o      613                 :CBC         466 :     PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
                                614                 :                : 
                                615         [ -  + ]:            466 :     Assert(txndata);
                                616         [ -  + ]:            466 :     Assert(!txndata->sent_begin_txn);
                                617                 :                : 
 3342 peter_e@gmx.net           618                 :            466 :     OutputPluginPrepareWrite(ctx, !send_replication_origin);
                                619                 :            466 :     logicalrep_write_begin(ctx->out, txn);
 1446 akapila@postgresql.o      620                 :            466 :     txndata->sent_begin_txn = true;
                                621                 :                : 
 1705                           622                 :            466 :     send_repl_origin(ctx, txn->origin_id, txn->origin_lsn,
                                623                 :                :                      send_replication_origin);
                                624                 :                : 
 3342 peter_e@gmx.net           625                 :            466 :     OutputPluginWrite(ctx, true);
                                626                 :            465 : }
                                627                 :                : 
                                628                 :                : /*
                                629                 :                :  * COMMIT callback
                                630                 :                :  */
                                631                 :                : static void
                                632                 :           1094 : pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
                                633                 :                :                     XLogRecPtr commit_lsn)
                                634                 :                : {
 1446 akapila@postgresql.o      635                 :           1094 :     PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
                                636                 :                :     bool        sent_begin_txn;
                                637                 :                : 
                                638         [ -  + ]:           1094 :     Assert(txndata);
                                639                 :                : 
                                640                 :                :     /*
                                641                 :                :      * We don't need to send the commit message unless some relevant change
                                642                 :                :      * from this transaction has been sent to the downstream.
                                643                 :                :      */
                                644                 :           1094 :     sent_begin_txn = txndata->sent_begin_txn;
 1131                           645                 :           1094 :     OutputPluginUpdateProgress(ctx, !sent_begin_txn);
 1446                           646                 :           1094 :     pfree(txndata);
                                647                 :           1094 :     txn->output_plugin_private = NULL;
                                648                 :                : 
                                649         [ +  + ]:           1094 :     if (!sent_begin_txn)
                                650                 :                :     {
                                651         [ +  + ]:            629 :         elog(DEBUG1, "skipped replication of an empty transaction with XID: %u", txn->xid);
                                652                 :            629 :         return;
                                653                 :                :     }
                                654                 :                : 
 3342 peter_e@gmx.net           655                 :            465 :     OutputPluginPrepareWrite(ctx, true);
                                656                 :            465 :     logicalrep_write_commit(ctx->out, txn, commit_lsn);
                                657                 :            465 :     OutputPluginWrite(ctx, true);
                                658                 :                : }
                                659                 :                : 
                                660                 :                : /*
                                661                 :                :  * BEGIN PREPARE callback
                                662                 :                :  */
                                663                 :                : static void
 1705 akapila@postgresql.o      664                 :             17 : pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
                                665                 :                : {
   46 msawada@postgresql.o      666                 :GNC          17 :     bool        send_replication_origin = txn->origin_id != InvalidReplOriginId;
                                667                 :                : 
 1705 akapila@postgresql.o      668                 :CBC          17 :     OutputPluginPrepareWrite(ctx, !send_replication_origin);
                                669                 :             17 :     logicalrep_write_begin_prepare(ctx->out, txn);
                                670                 :                : 
                                671                 :             17 :     send_repl_origin(ctx, txn->origin_id, txn->origin_lsn,
                                672                 :                :                      send_replication_origin);
                                673                 :                : 
                                674                 :             17 :     OutputPluginWrite(ctx, true);
                                675                 :             17 : }
                                676                 :                : 
                                677                 :                : /*
                                678                 :                :  * PREPARE callback
                                679                 :                :  */
                                680                 :                : static void
                                681                 :             17 : pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
                                682                 :                :                      XLogRecPtr prepare_lsn)
                                683                 :                : {
 1131                           684                 :             17 :     OutputPluginUpdateProgress(ctx, false);
                                685                 :                : 
 1705                           686                 :             17 :     OutputPluginPrepareWrite(ctx, true);
                                687                 :             17 :     logicalrep_write_prepare(ctx->out, txn, prepare_lsn);
                                688                 :             17 :     OutputPluginWrite(ctx, true);
                                689                 :             17 : }
                                690                 :                : 
                                691                 :                : /*
                                692                 :                :  * COMMIT PREPARED callback
                                693                 :                :  */
                                694                 :                : static void
                                695                 :             24 : pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
                                696                 :                :                              XLogRecPtr commit_lsn)
                                697                 :                : {
 1131                           698                 :             24 :     OutputPluginUpdateProgress(ctx, false);
                                699                 :                : 
 1705                           700                 :             24 :     OutputPluginPrepareWrite(ctx, true);
                                701                 :             24 :     logicalrep_write_commit_prepared(ctx->out, txn, commit_lsn);
                                702                 :             24 :     OutputPluginWrite(ctx, true);
                                703                 :             24 : }
                                704                 :                : 
                                705                 :                : /*
                                706                 :                :  * ROLLBACK PREPARED callback
                                707                 :                :  */
                                708                 :                : static void
                                709                 :              5 : pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx,
                                710                 :                :                                ReorderBufferTXN *txn,
                                711                 :                :                                XLogRecPtr prepare_end_lsn,
                                712                 :                :                                TimestampTz prepare_time)
                                713                 :                : {
 1131                           714                 :              5 :     OutputPluginUpdateProgress(ctx, false);
                                715                 :                : 
 1705                           716                 :              5 :     OutputPluginPrepareWrite(ctx, true);
                                717                 :              5 :     logicalrep_write_rollback_prepared(ctx->out, txn, prepare_end_lsn,
                                718                 :                :                                        prepare_time);
                                719                 :              5 :     OutputPluginWrite(ctx, true);
                                720                 :              5 : }
                                721                 :                : 
                                722                 :                : /*
                                723                 :                :  * Write the current schema of the relation and its ancestor (if any) if not
                                724                 :                :  * done yet.
                                725                 :                :  */
                                726                 :                : static void
 2899 peter_e@gmx.net           727                 :         182267 : maybe_send_schema(LogicalDecodingContext *ctx,
                                728                 :                :                   ReorderBufferChange *change,
                                729                 :                :                   Relation relation, RelationSyncEntry *relentry)
                                730                 :                : {
  899 michael@paquier.xyz       731                 :         182267 :     PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
                                732                 :                :     bool        schema_sent;
 2019 akapila@postgresql.o      733                 :         182267 :     TransactionId xid = InvalidTransactionId;
                                734                 :         182267 :     TransactionId topxid = InvalidTransactionId;
                                735                 :                : 
                                736                 :                :     /*
                                737                 :                :      * Remember XID of the (sub)transaction for the change. We don't care if
                                738                 :                :      * it's top-level transaction or not (we have already sent that XID in
                                739                 :                :      * start of the current streaming block).
                                740                 :                :      *
                                741                 :                :      * If we're not in a streaming block, just use InvalidTransactionId and
                                742                 :                :      * the write methods will not include it.
                                743                 :                :      */
  899 michael@paquier.xyz       744         [ +  + ]:         182267 :     if (data->in_streaming)
 2019 akapila@postgresql.o      745                 :         175903 :         xid = change->txn->xid;
                                746                 :                : 
 1094                           747         [ +  + ]:         182267 :     if (rbtxn_is_subtxn(change->txn))
                                748         [ +  - ]:          10169 :         topxid = rbtxn_get_toptxn(change->txn)->xid;
                                749                 :                :     else
 2019                           750                 :         172098 :         topxid = xid;
                                751                 :                : 
                                752                 :                :     /*
                                753                 :                :      * Do we need to send the schema? We do track streamed transactions
                                754                 :                :      * separately, because those may be applied later (and the regular
                                755                 :                :      * transactions won't see their effects until then) and in an order that
                                756                 :                :      * we don't know at this point.
                                757                 :                :      *
                                758                 :                :      * XXX There is a scope of optimization here. Currently, we always send
                                759                 :                :      * the schema first time in a streaming transaction but we can probably
                                760                 :                :      * avoid that by checking 'relentry->schema_sent' flag. However, before
                                761                 :                :      * doing that we need to study its impact on the case where we have a mix
                                762                 :                :      * of streaming and non-streaming transactions.
                                763                 :                :      */
  899 michael@paquier.xyz       764         [ +  + ]:         182267 :     if (data->in_streaming)
 2019 akapila@postgresql.o      765                 :         175903 :         schema_sent = get_schema_sent_in_streamed_txn(relentry, topxid);
                                766                 :                :     else
                                767                 :           6364 :         schema_sent = relentry->schema_sent;
                                768                 :                : 
                                769                 :                :     /* Nothing to do if we already sent the schema. */
                                770         [ +  + ]:         182267 :     if (schema_sent)
 2167 peter@eisentraut.org      771                 :         181918 :         return;
                                772                 :                : 
                                773                 :                :     /*
                                774                 :                :      * Send the schema.  If the changes will be published using an ancestor's
                                775                 :                :      * schema, not the relation's own, send that ancestor's schema before
                                776                 :                :      * sending relation's own (XXX - maybe sending only the former suffices?).
                                777                 :                :      */
                                778         [ +  + ]:            349 :     if (relentry->publish_as_relid != RelationGetRelid(relation))
                                779                 :                :     {
                                780                 :             34 :         Relation    ancestor = RelationIdGetRelation(relentry->publish_as_relid);
                                781                 :                : 
  493 akapila@postgresql.o      782                 :             34 :         send_relation_and_attrs(ancestor, xid, ctx, relentry);
 2167 peter@eisentraut.org      783                 :             34 :         RelationClose(ancestor);
                                784                 :                :     }
                                785                 :                : 
  493 akapila@postgresql.o      786                 :            349 :     send_relation_and_attrs(relation, xid, ctx, relentry);
                                787                 :                : 
  899 michael@paquier.xyz       788         [ +  + ]:            349 :     if (data->in_streaming)
 2019 akapila@postgresql.o      789                 :             66 :         set_schema_sent_in_streamed_txn(relentry, topxid);
                                790                 :                :     else
                                791                 :            283 :         relentry->schema_sent = true;
                                792                 :                : }
                                793                 :                : 
                                794                 :                : /*
                                795                 :                :  * Sends a relation
                                796                 :                :  */
                                797                 :                : static void
                                798                 :            383 : send_relation_and_attrs(Relation relation, TransactionId xid,
                                799                 :                :                         LogicalDecodingContext *ctx,
                                800                 :                :                         RelationSyncEntry *relentry)
                                801                 :                : {
 2167 peter@eisentraut.org      802                 :            383 :     TupleDesc   desc = RelationGetDescr(relation);
  493 akapila@postgresql.o      803                 :            383 :     Bitmapset  *columns = relentry->columns;
  416                           804                 :            383 :     PublishGencolsType include_gencols_type = relentry->include_gencols_type;
                                805                 :                :     int         i;
                                806                 :                : 
                                807                 :                :     /*
                                808                 :                :      * Write out type info if needed.  We do that only for user-created types.
                                809                 :                :      * We use FirstGenbkiObjectId as the cutoff, so that we only consider
                                810                 :                :      * objects with hand-assigned OIDs to be "built in", not for instance any
                                811                 :                :      * function or type defined in the information_schema. This is important
                                812                 :                :      * because only hand-assigned OIDs can be expected to remain stable across
                                813                 :                :      * major versions.
                                814                 :                :      */
 2167 peter@eisentraut.org      815         [ +  + ]:           1179 :     for (i = 0; i < desc->natts; i++)
                                816                 :                :     {
                                817                 :            796 :         Form_pg_attribute att = TupleDescAttr(desc, i);
                                818                 :                : 
  416 akapila@postgresql.o      819         [ +  + ]:            796 :         if (!logicalrep_should_publish_column(att, columns,
                                820                 :                :                                               include_gencols_type))
 2167 peter@eisentraut.org      821                 :             71 :             continue;
                                822                 :                : 
                                823         [ +  + ]:            725 :         if (att->atttypid < FirstGenbkiObjectId)
                                824                 :            707 :             continue;
                                825                 :                : 
 2899 peter_e@gmx.net           826                 :             18 :         OutputPluginPrepareWrite(ctx, false);
 2019 akapila@postgresql.o      827                 :             18 :         logicalrep_write_typ(ctx->out, xid, att->atttypid);
 2899 peter_e@gmx.net           828                 :             18 :         OutputPluginWrite(ctx, false);
                                829                 :                :     }
                                830                 :                : 
 2167 peter@eisentraut.org      831                 :            383 :     OutputPluginPrepareWrite(ctx, false);
  416 akapila@postgresql.o      832                 :            383 :     logicalrep_write_rel(ctx->out, xid, relation, columns,
                                833                 :                :                          include_gencols_type);
 2167 peter@eisentraut.org      834                 :            383 :     OutputPluginWrite(ctx, false);
 2899 peter_e@gmx.net           835                 :            383 : }
                                836                 :                : 
                                837                 :                : /*
                                838                 :                :  * Executor state preparation for evaluation of row filter expressions for the
                                839                 :                :  * specified relation.
                                840                 :                :  */
                                841                 :                : static EState *
 1482 akapila@postgresql.o      842                 :             17 : create_estate_for_relation(Relation rel)
                                843                 :                : {
                                844                 :                :     EState     *estate;
                                845                 :                :     RangeTblEntry *rte;
 1105 tgl@sss.pgh.pa.us         846                 :             17 :     List       *perminfos = NIL;
                                847                 :                : 
 1482 akapila@postgresql.o      848                 :             17 :     estate = CreateExecutorState();
                                849                 :                : 
                                850                 :             17 :     rte = makeNode(RangeTblEntry);
                                851                 :             17 :     rte->rtekind = RTE_RELATION;
                                852                 :             17 :     rte->relid = RelationGetRelid(rel);
                                853                 :             17 :     rte->relkind = rel->rd_rel->relkind;
                                854                 :             17 :     rte->rellockmode = AccessShareLock;
                                855                 :                : 
 1105 tgl@sss.pgh.pa.us         856                 :             17 :     addRTEPermissionInfo(&perminfos, rte);
                                857                 :                : 
  401 amitlan@postgresql.o      858                 :             17 :     ExecInitRangeTable(estate, list_make1(rte), perminfos,
                                859                 :                :                        bms_make_singleton(1));
                                860                 :                : 
 1482 akapila@postgresql.o      861                 :             17 :     estate->es_output_cid = GetCurrentCommandId(false);
                                862                 :                : 
                                863                 :             17 :     return estate;
                                864                 :                : }
                                865                 :                : 
                                866                 :                : /*
                                867                 :                :  * Evaluates row filter.
                                868                 :                :  *
                                869                 :                :  * If the row filter evaluates to NULL, it is taken as false i.e. the change
                                870                 :                :  * isn't replicated.
                                871                 :                :  */
                                872                 :                : static bool
                                873                 :             38 : pgoutput_row_filter_exec_expr(ExprState *state, ExprContext *econtext)
                                874                 :                : {
                                875                 :                :     Datum       ret;
                                876                 :                :     bool        isnull;
                                877                 :                : 
                                878         [ -  + ]:             38 :     Assert(state != NULL);
                                879                 :                : 
                                880                 :             38 :     ret = ExecEvalExprSwitchContext(state, econtext, &isnull);
                                881                 :                : 
                                882   [ -  +  -  -  :             38 :     elog(DEBUG3, "row filter evaluates to %s (isnull: %s)",
                                        -  -  -  - ]
                                883                 :                :          isnull ? "false" : DatumGetBool(ret) ? "true" : "false",
                                884                 :                :          isnull ? "true" : "false");
                                885                 :                : 
                                886         [ +  + ]:             38 :     if (isnull)
                                887                 :              1 :         return false;
                                888                 :                : 
                                889                 :             37 :     return DatumGetBool(ret);
                                890                 :                : }
                                891                 :                : 
                                892                 :                : /*
                                893                 :                :  * Make sure the per-entry memory context exists.
                                894                 :                :  */
                                895                 :                : static void
 1450 tomas.vondra@postgre      896                 :            335 : pgoutput_ensure_entry_cxt(PGOutputData *data, RelationSyncEntry *entry)
                                897                 :                : {
                                898                 :                :     Relation    relation;
                                899                 :                : 
                                900                 :                :     /* The context may already exist, in which case bail out. */
                                901         [ +  + ]:            335 :     if (entry->entry_cxt)
                                902                 :             17 :         return;
                                903                 :                : 
                                904                 :            318 :     relation = RelationIdGetRelation(entry->publish_as_relid);
                                905                 :                : 
                                906                 :            318 :     entry->entry_cxt = AllocSetContextCreate(data->cachectx,
                                907                 :                :                                              "entry private context",
                                908                 :                :                                              ALLOCSET_SMALL_SIZES);
                                909                 :                : 
                                910                 :            318 :     MemoryContextCopyAndSetIdentifier(entry->entry_cxt,
                                911                 :                :                                       RelationGetRelationName(relation));
                                912                 :                : }
                                913                 :                : 
                                914                 :                : /*
                                915                 :                :  * Initialize the row filter.
                                916                 :                :  */
                                917                 :                : static void
 1482 akapila@postgresql.o      918                 :            318 : pgoutput_row_filter_init(PGOutputData *data, List *publications,
                                919                 :                :                          RelationSyncEntry *entry)
                                920                 :                : {
                                921                 :                :     ListCell   *lc;
                                922                 :            318 :     List       *rfnodes[] = {NIL, NIL, NIL};    /* One per pubaction */
                                923                 :            318 :     bool        no_filter[] = {false, false, false};    /* One per pubaction */
                                924                 :                :     MemoryContext oldctx;
                                925                 :                :     int         idx;
                                926                 :            318 :     bool        has_filter = true;
 1269                           927                 :            318 :     Oid         schemaid = get_rel_namespace(entry->publish_as_relid);
                                928                 :                : 
                                929                 :                :     /*
                                930                 :                :      * Find if there are any row filters for this relation. If there are, then
                                931                 :                :      * prepare the necessary ExprState and cache it in entry->exprstate. To
                                932                 :                :      * build an expression state, we need to ensure the following:
                                933                 :                :      *
                                934                 :                :      * All the given publication-table mappings must be checked.
                                935                 :                :      *
                                936                 :                :      * Multiple publications might have multiple row filters for this
                                937                 :                :      * relation. Since row filter usage depends on the DML operation, there
                                938                 :                :      * are multiple lists (one for each operation) to which row filters will
                                939                 :                :      * be appended.
                                940                 :                :      *
                                941                 :                :      * FOR ALL TABLES and FOR TABLES IN SCHEMA implies "don't use row filter
                                942                 :                :      * expression" so it takes precedence.
                                943                 :                :      */
 1482                           944   [ +  -  +  +  :            339 :     foreach(lc, publications)
                                              +  + ]
                                945                 :                :     {
                                946                 :            322 :         Publication *pub = lfirst(lc);
                                947                 :            322 :         HeapTuple   rftuple = NULL;
                                948                 :            322 :         Datum       rfdatum = 0;
 1269                           949                 :            322 :         bool        pub_no_filter = true;
                                950                 :                : 
                                951                 :                :         /*
                                952                 :                :          * If the publication is FOR ALL TABLES, or the publication includes a
                                953                 :                :          * FOR TABLES IN SCHEMA where the table belongs to the referred
                                954                 :                :          * schema, then it is treated the same as if there are no row filters
                                955                 :                :          * (even if other publications have a row filter).
                                956                 :                :          */
                                957         [ +  + ]:            322 :         if (!pub->alltables &&
                                958         [ +  + ]:            238 :             !SearchSysCacheExists2(PUBLICATIONNAMESPACEMAP,
                                959                 :                :                                    ObjectIdGetDatum(schemaid),
                                960                 :                :                                    ObjectIdGetDatum(pub->oid)))
                                961                 :                :         {
                                962                 :                :             /*
                                963                 :                :              * Check for the presence of a row filter in this publication.
                                964                 :                :              */
 1482                           965                 :            231 :             rftuple = SearchSysCache2(PUBLICATIONRELMAP,
                                966                 :                :                                       ObjectIdGetDatum(entry->publish_as_relid),
                                967                 :                :                                       ObjectIdGetDatum(pub->oid));
                                968                 :                : 
                                969         [ +  + ]:            231 :             if (HeapTupleIsValid(rftuple))
                                970                 :                :             {
                                971                 :                :                 /* Null indicates no filter. */
                                972                 :            219 :                 rfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
                                973                 :                :                                           Anum_pg_publication_rel_prqual,
                                974                 :                :                                           &pub_no_filter);
                                975                 :                :             }
                                976                 :                :         }
                                977                 :                : 
                                978         [ +  + ]:            322 :         if (pub_no_filter)
                                979                 :                :         {
                                980         [ +  + ]:            308 :             if (rftuple)
                                981                 :            205 :                 ReleaseSysCache(rftuple);
                                982                 :                : 
                                983                 :            308 :             no_filter[PUBACTION_INSERT] |= pub->pubactions.pubinsert;
                                984                 :            308 :             no_filter[PUBACTION_UPDATE] |= pub->pubactions.pubupdate;
                                985                 :            308 :             no_filter[PUBACTION_DELETE] |= pub->pubactions.pubdelete;
                                986                 :                : 
                                987                 :                :             /*
                                988                 :                :              * Quick exit if all the DML actions are publicized via this
                                989                 :                :              * publication.
                                990                 :                :              */
                                991         [ +  - ]:            308 :             if (no_filter[PUBACTION_INSERT] &&
                                992         [ +  + ]:            308 :                 no_filter[PUBACTION_UPDATE] &&
                                993         [ +  - ]:            301 :                 no_filter[PUBACTION_DELETE])
                                994                 :                :             {
                                995                 :            301 :                 has_filter = false;
                                996                 :            301 :                 break;
                                997                 :                :             }
                                998                 :                : 
                                999                 :                :             /* No additional work for this publication. Next one. */
                               1000                 :              7 :             continue;
                               1001                 :                :         }
                               1002                 :                : 
                               1003                 :                :         /* Form the per pubaction row filter lists. */
                               1004   [ +  -  +  - ]:             14 :         if (pub->pubactions.pubinsert && !no_filter[PUBACTION_INSERT])
                               1005                 :             14 :             rfnodes[PUBACTION_INSERT] = lappend(rfnodes[PUBACTION_INSERT],
                               1006                 :             14 :                                                 TextDatumGetCString(rfdatum));
                               1007   [ +  -  +  - ]:             14 :         if (pub->pubactions.pubupdate && !no_filter[PUBACTION_UPDATE])
                               1008                 :             14 :             rfnodes[PUBACTION_UPDATE] = lappend(rfnodes[PUBACTION_UPDATE],
                               1009                 :             14 :                                                 TextDatumGetCString(rfdatum));
                               1010   [ +  -  +  - ]:             14 :         if (pub->pubactions.pubdelete && !no_filter[PUBACTION_DELETE])
                               1011                 :             14 :             rfnodes[PUBACTION_DELETE] = lappend(rfnodes[PUBACTION_DELETE],
                               1012                 :             14 :                                                 TextDatumGetCString(rfdatum));
                               1013                 :                : 
                               1014                 :             14 :         ReleaseSysCache(rftuple);
                               1015                 :                :     }                           /* loop all subscribed publications */
                               1016                 :                : 
                               1017                 :                :     /* Clean the row filter */
                               1018         [ +  + ]:           1272 :     for (idx = 0; idx < NUM_ROWFILTER_PUBACTIONS; idx++)
                               1019                 :                :     {
                               1020         [ +  + ]:            954 :         if (no_filter[idx])
                               1021                 :                :         {
                               1022                 :            912 :             list_free_deep(rfnodes[idx]);
                               1023                 :            912 :             rfnodes[idx] = NIL;
                               1024                 :                :         }
                               1025                 :                :     }
                               1026                 :                : 
                               1027         [ +  + ]:            318 :     if (has_filter)
                               1028                 :                :     {
                               1029                 :             17 :         Relation    relation = RelationIdGetRelation(entry->publish_as_relid);
                               1030                 :                : 
 1450 tomas.vondra@postgre     1031                 :             17 :         pgoutput_ensure_entry_cxt(data, entry);
                               1032                 :                : 
                               1033                 :                :         /*
                               1034                 :                :          * Now all the filters for all pubactions are known. Combine them when
                               1035                 :                :          * their pubactions are the same.
                               1036                 :                :          */
                               1037                 :             17 :         oldctx = MemoryContextSwitchTo(entry->entry_cxt);
 1482 akapila@postgresql.o     1038                 :             17 :         entry->estate = create_estate_for_relation(relation);
                               1039         [ +  + ]:             68 :         for (idx = 0; idx < NUM_ROWFILTER_PUBACTIONS; idx++)
                               1040                 :                :         {
                               1041                 :             51 :             List       *filters = NIL;
                               1042                 :                :             Expr       *rfnode;
                               1043                 :                : 
                               1044         [ +  + ]:             51 :             if (rfnodes[idx] == NIL)
                               1045                 :             21 :                 continue;
                               1046                 :                : 
                               1047   [ +  -  +  +  :             63 :             foreach(lc, rfnodes[idx])
                                              +  + ]
  401 peter@eisentraut.org     1048                 :             33 :                 filters = lappend(filters, expand_generated_columns_in_expr(stringToNode((char *) lfirst(lc)), relation, 1));
                               1049                 :                : 
                               1050                 :                :             /* combine the row filter and cache the ExprState */
 1482 akapila@postgresql.o     1051                 :             30 :             rfnode = make_orclause(filters);
                               1052                 :             30 :             entry->exprstate[idx] = ExecPrepareExpr(rfnode, entry->estate);
                               1053                 :                :         }                       /* for each pubaction */
                               1054                 :             17 :         MemoryContextSwitchTo(oldctx);
                               1055                 :                : 
                               1056                 :             17 :         RelationClose(relation);
                               1057                 :                :     }
                               1058                 :            318 : }
                               1059                 :                : 
                               1060                 :                : /*
                               1061                 :                :  * If the table contains a generated column, check for any conflicting
                               1062                 :                :  * values of 'publish_generated_columns' parameter in the publications.
                               1063                 :                :  */
                               1064                 :                : static void
  493                          1065                 :            318 : check_and_init_gencol(PGOutputData *data, List *publications,
                               1066                 :                :                       RelationSyncEntry *entry)
                               1067                 :                : {
                               1068                 :            318 :     Relation    relation = RelationIdGetRelation(entry->publish_as_relid);
                               1069                 :            318 :     TupleDesc   desc = RelationGetDescr(relation);
                               1070                 :            318 :     bool        gencolpresent = false;
                               1071                 :            318 :     bool        first = true;
                               1072                 :                : 
                               1073                 :                :     /* Check if there is any generated column present. */
                               1074         [ +  + ]:            967 :     for (int i = 0; i < desc->natts; i++)
                               1075                 :                :     {
  144 drowley@postgresql.o     1076                 :GNC         656 :         CompactAttribute *att = TupleDescCompactAttr(desc, i);
                               1077                 :                : 
  493 akapila@postgresql.o     1078         [ +  + ]:CBC         656 :         if (att->attgenerated)
                               1079                 :                :         {
                               1080                 :              7 :             gencolpresent = true;
                               1081                 :              7 :             break;
                               1082                 :                :         }
                               1083                 :                :     }
                               1084                 :                : 
                               1085                 :                :     /* There are no generated columns to be published. */
                               1086         [ +  + ]:            318 :     if (!gencolpresent)
                               1087                 :                :     {
  416                          1088                 :            311 :         entry->include_gencols_type = PUBLISH_GENCOLS_NONE;
  493                          1089                 :            311 :         return;
                               1090                 :                :     }
                               1091                 :                : 
                               1092                 :                :     /*
                               1093                 :                :      * There may be a conflicting value for 'publish_generated_columns'
                               1094                 :                :      * parameter in the publications.
                               1095                 :                :      */
                               1096   [ +  -  +  +  :             22 :     foreach_ptr(Publication, pub, publications)
                                              +  + ]
                               1097                 :                :     {
                               1098                 :                :         /*
                               1099                 :                :          * The column list takes precedence over the
                               1100                 :                :          * 'publish_generated_columns' parameter. Those will be checked later,
                               1101                 :                :          * see pgoutput_column_list_init.
                               1102                 :                :          */
                               1103         [ +  + ]:              8 :         if (check_and_fetch_column_list(pub, entry->publish_as_relid, NULL, NULL))
                               1104                 :              3 :             continue;
                               1105                 :                : 
                               1106         [ +  - ]:              5 :         if (first)
                               1107                 :                :         {
  416                          1108                 :              5 :             entry->include_gencols_type = pub->pubgencols_type;
  493                          1109                 :              5 :             first = false;
                               1110                 :                :         }
  416 akapila@postgresql.o     1111         [ #  # ]:UBC           0 :         else if (entry->include_gencols_type != pub->pubgencols_type)
  493                          1112         [ #  # ]:              0 :             ereport(ERROR,
                               1113                 :                :                     errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                               1114                 :                :                     errmsg("cannot use different values of publish_generated_columns for table \"%s.%s\" in different publications",
                               1115                 :                :                            get_namespace_name(RelationGetNamespace(relation)),
                               1116                 :                :                            RelationGetRelationName(relation)));
                               1117                 :                :     }
                               1118                 :                : }
                               1119                 :                : 
                               1120                 :                : /*
                               1121                 :                :  * Initialize the column list.
                               1122                 :                :  */
                               1123                 :                : static void
 1450 tomas.vondra@postgre     1124                 :CBC         318 : pgoutput_column_list_init(PGOutputData *data, List *publications,
                               1125                 :                :                           RelationSyncEntry *entry)
                               1126                 :                : {
                               1127                 :                :     ListCell   *lc;
 1382 akapila@postgresql.o     1128                 :            318 :     bool        first = true;
                               1129                 :            318 :     Relation    relation = RelationIdGetRelation(entry->publish_as_relid);
  493                          1130                 :            318 :     bool        found_pub_collist = false;
                               1131                 :            318 :     Bitmapset  *relcols = NULL;
                               1132                 :                : 
                               1133                 :            318 :     pgoutput_ensure_entry_cxt(data, entry);
                               1134                 :                : 
                               1135                 :                :     /*
                               1136                 :                :      * Find if there are any column lists for this relation. If there are,
                               1137                 :                :      * build a bitmap using the column lists.
                               1138                 :                :      *
                               1139                 :                :      * Multiple publications might have multiple column lists for this
                               1140                 :                :      * relation.
                               1141                 :                :      *
                               1142                 :                :      * Note that we don't support the case where the column list is different
                               1143                 :                :      * for the same table when combining publications. See comments atop
                               1144                 :                :      * fetch_relation_list. But one can later change the publication so we
                               1145                 :                :      * still need to check all the given publication-table mappings and report
                               1146                 :                :      * an error if any publications have a different column list.
                               1147                 :                :      */
 1450 tomas.vondra@postgre     1148   [ +  -  +  +  :            646 :     foreach(lc, publications)
                                              +  + ]
                               1149                 :                :     {
                               1150                 :            329 :         Publication *pub = lfirst(lc);
 1382 akapila@postgresql.o     1151                 :            329 :         Bitmapset  *cols = NULL;
                               1152                 :                : 
                               1153                 :                :         /* Retrieve the bitmap of columns for a column list publication. */
  493                          1154                 :            329 :         found_pub_collist |= check_and_fetch_column_list(pub,
                               1155                 :                :                                                          entry->publish_as_relid,
                               1156                 :                :                                                          entry->entry_cxt, &cols);
                               1157                 :                : 
                               1158                 :                :         /*
                               1159                 :                :          * For non-column list publications — e.g. TABLE (without a column
                               1160                 :                :          * list), ALL TABLES, or ALL TABLES IN SCHEMA, we consider all columns
                               1161                 :                :          * of the table (including generated columns when
                               1162                 :                :          * 'publish_generated_columns' parameter is true).
                               1163                 :                :          */
                               1164         [ +  + ]:            329 :         if (!cols)
                               1165                 :                :         {
                               1166                 :                :             /*
                               1167                 :                :              * Cache the table columns for the first publication with no
                               1168                 :                :              * specified column list to detect publication with a different
                               1169                 :                :              * column list.
                               1170                 :                :              */
                               1171   [ +  +  +  + ]:            290 :             if (!relcols && (list_length(publications) > 1))
                               1172                 :                :             {
                               1173                 :              9 :                 MemoryContext oldcxt = MemoryContextSwitchTo(entry->entry_cxt);
                               1174                 :                : 
  416                          1175                 :              9 :                 relcols = pub_form_cols_map(relation,
                               1176                 :                :                                             entry->include_gencols_type);
  493                          1177                 :              9 :                 MemoryContextSwitchTo(oldcxt);
                               1178                 :                :             }
                               1179                 :                : 
                               1180                 :            290 :             cols = relcols;
                               1181                 :                :         }
                               1182                 :                : 
 1382                          1183         [ +  + ]:            329 :         if (first)
                               1184                 :                :         {
                               1185                 :            318 :             entry->columns = cols;
                               1186                 :            318 :             first = false;
                               1187                 :                :         }
                               1188         [ +  + ]:             11 :         else if (!bms_equal(entry->columns, cols))
                               1189         [ +  - ]:              1 :             ereport(ERROR,
                               1190                 :                :                     errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                               1191                 :                :                     errmsg("cannot use different column lists for table \"%s.%s\" in different publications",
                               1192                 :                :                            get_namespace_name(RelationGetNamespace(relation)),
                               1193                 :                :                            RelationGetRelationName(relation)));
                               1194                 :                :     }                           /* loop all subscribed publications */
                               1195                 :                : 
                               1196                 :                :     /*
                               1197                 :                :      * If no column list publications exist, columns to be published will be
                               1198                 :                :      * computed later according to the 'publish_generated_columns' parameter.
                               1199                 :                :      */
  493                          1200         [ +  + ]:            317 :     if (!found_pub_collist)
                               1201                 :            281 :         entry->columns = NULL;
                               1202                 :                : 
 1382                          1203                 :            317 :     RelationClose(relation);
 1450 tomas.vondra@postgre     1204                 :            317 : }
                               1205                 :                : 
                               1206                 :                : /*
                               1207                 :                :  * Initialize the slot for storing new and old tuples, and build the map that
                               1208                 :                :  * will be used to convert the relation's tuples into the ancestor's format.
                               1209                 :                :  */
                               1210                 :                : static void
 1482 akapila@postgresql.o     1211                 :            318 : init_tuple_slot(PGOutputData *data, Relation relation,
                               1212                 :                :                 RelationSyncEntry *entry)
                               1213                 :                : {
                               1214                 :                :     MemoryContext oldctx;
                               1215                 :                :     TupleDesc   oldtupdesc;
                               1216                 :                :     TupleDesc   newtupdesc;
                               1217                 :                : 
                               1218                 :            318 :     oldctx = MemoryContextSwitchTo(data->cachectx);
                               1219                 :                : 
                               1220                 :                :     /*
                               1221                 :                :      * Create tuple table slots. Create a copy of the TupleDesc as it needs to
                               1222                 :                :      * live as long as the cache remains.
                               1223                 :                :      */
  839                          1224                 :            318 :     oldtupdesc = CreateTupleDescCopyConstr(RelationGetDescr(relation));
                               1225                 :            318 :     newtupdesc = CreateTupleDescCopyConstr(RelationGetDescr(relation));
                               1226                 :                : 
 1482                          1227                 :            318 :     entry->old_slot = MakeSingleTupleTableSlot(oldtupdesc, &TTSOpsHeapTuple);
                               1228                 :            318 :     entry->new_slot = MakeSingleTupleTableSlot(newtupdesc, &TTSOpsHeapTuple);
                               1229                 :                : 
                               1230                 :            318 :     MemoryContextSwitchTo(oldctx);
                               1231                 :                : 
                               1232                 :                :     /*
                               1233                 :                :      * Cache the map that will be used to convert the relation's tuples into
                               1234                 :                :      * the ancestor's format, if needed.
                               1235                 :                :      */
                               1236         [ +  + ]:            318 :     if (entry->publish_as_relid != RelationGetRelid(relation))
                               1237                 :                :     {
                               1238                 :             39 :         Relation    ancestor = RelationIdGetRelation(entry->publish_as_relid);
                               1239                 :             39 :         TupleDesc   indesc = RelationGetDescr(relation);
                               1240                 :             39 :         TupleDesc   outdesc = RelationGetDescr(ancestor);
                               1241                 :                : 
                               1242                 :                :         /* Map must live as long as the logical decoding context. */
  440 michael@paquier.xyz      1243                 :             39 :         oldctx = MemoryContextSwitchTo(data->cachectx);
                               1244                 :                : 
 1202 alvherre@alvh.no-ip.     1245                 :             39 :         entry->attrmap = build_attrmap_by_name_if_req(indesc, outdesc, false);
                               1246                 :                : 
 1482 akapila@postgresql.o     1247                 :             39 :         MemoryContextSwitchTo(oldctx);
                               1248                 :             39 :         RelationClose(ancestor);
                               1249                 :                :     }
                               1250                 :            318 : }
                               1251                 :                : 
                               1252                 :                : /*
                               1253                 :                :  * Change is checked against the row filter if any.
                               1254                 :                :  *
                               1255                 :                :  * Returns true if the change is to be replicated, else false.
                               1256                 :                :  *
                               1257                 :                :  * For inserts, evaluate the row filter for new tuple.
                               1258                 :                :  * For deletes, evaluate the row filter for old tuple.
                               1259                 :                :  * For updates, evaluate the row filter for old and new tuple.
                               1260                 :                :  *
                               1261                 :                :  * For updates, if both evaluations are true, we allow sending the UPDATE and
                               1262                 :                :  * if both the evaluations are false, it doesn't replicate the UPDATE. Now, if
                               1263                 :                :  * only one of the tuples matches the row filter expression, we transform
                               1264                 :                :  * UPDATE to DELETE or INSERT to avoid any data inconsistency based on the
                               1265                 :                :  * following rules:
                               1266                 :                :  *
                               1267                 :                :  * Case 1: old-row (no match)    new-row (no match)  -> (drop change)
                               1268                 :                :  * Case 2: old-row (no match)    new row (match)     -> INSERT
                               1269                 :                :  * Case 3: old-row (match)       new-row (no match)  -> DELETE
                               1270                 :                :  * Case 4: old-row (match)       new row (match)     -> UPDATE
                               1271                 :                :  *
                               1272                 :                :  * The new action is updated in the action parameter.
                               1273                 :                :  *
                               1274                 :                :  * The new slot could be updated when transforming the UPDATE into INSERT,
                               1275                 :                :  * because the original new tuple might not have column values from the replica
                               1276                 :                :  * identity.
                               1277                 :                :  *
                               1278                 :                :  * Examples:
                               1279                 :                :  * Let's say the old tuple satisfies the row filter but the new tuple doesn't.
                               1280                 :                :  * Since the old tuple satisfies, the initial table synchronization copied this
                               1281                 :                :  * row (or another method was used to guarantee that there is data
                               1282                 :                :  * consistency).  However, after the UPDATE the new tuple doesn't satisfy the
                               1283                 :                :  * row filter, so from a data consistency perspective, that row should be
                               1284                 :                :  * removed on the subscriber. The UPDATE should be transformed into a DELETE
                               1285                 :                :  * statement and be sent to the subscriber. Keeping this row on the subscriber
                               1286                 :                :  * is undesirable because it doesn't reflect what was defined in the row filter
                               1287                 :                :  * expression on the publisher. This row on the subscriber would likely not be
                               1288                 :                :  * modified by replication again. If someone inserted a new row with the same
                               1289                 :                :  * old identifier, replication could stop due to a constraint violation.
                               1290                 :                :  *
                               1291                 :                :  * Let's say the old tuple doesn't match the row filter but the new tuple does.
                               1292                 :                :  * Since the old tuple doesn't satisfy, the initial table synchronization
                               1293                 :                :  * probably didn't copy this row. However, after the UPDATE the new tuple does
                               1294                 :                :  * satisfy the row filter, so from a data consistency perspective, that row
                               1295                 :                :  * should be inserted on the subscriber. Otherwise, subsequent UPDATE or DELETE
                               1296                 :                :  * statements have no effect (it matches no row -- see
                               1297                 :                :  * apply_handle_update_internal()). So, the UPDATE should be transformed into a
                               1298                 :                :  * INSERT statement and be sent to the subscriber. However, this might surprise
                               1299                 :                :  * someone who expects the data set to satisfy the row filter expression on the
                               1300                 :                :  * provider.
                               1301                 :                :  */
                               1302                 :                : static bool
                               1303                 :         182262 : pgoutput_row_filter(Relation relation, TupleTableSlot *old_slot,
                               1304                 :                :                     TupleTableSlot **new_slot_ptr, RelationSyncEntry *entry,
                               1305                 :                :                     ReorderBufferChangeType *action)
                               1306                 :                : {
                               1307                 :                :     TupleDesc   desc;
                               1308                 :                :     int         i;
                               1309                 :                :     bool        old_matched,
                               1310                 :                :                 new_matched,
                               1311                 :                :                 result;
                               1312                 :                :     TupleTableSlot *tmp_new_slot;
                               1313                 :         182262 :     TupleTableSlot *new_slot = *new_slot_ptr;
                               1314                 :                :     ExprContext *ecxt;
                               1315                 :                :     ExprState  *filter_exprstate;
                               1316                 :                : 
                               1317                 :                :     /*
                               1318                 :                :      * We need this map to avoid relying on ReorderBufferChangeType enums
                               1319                 :                :      * having specific values.
                               1320                 :                :      */
                               1321                 :                :     static const int map_changetype_pubaction[] = {
                               1322                 :                :         [REORDER_BUFFER_CHANGE_INSERT] = PUBACTION_INSERT,
                               1323                 :                :         [REORDER_BUFFER_CHANGE_UPDATE] = PUBACTION_UPDATE,
                               1324                 :                :         [REORDER_BUFFER_CHANGE_DELETE] = PUBACTION_DELETE
                               1325                 :                :     };
                               1326                 :                : 
                               1327   [ +  +  +  +  :         182262 :     Assert(*action == REORDER_BUFFER_CHANGE_INSERT ||
                                              -  + ]
                               1328                 :                :            *action == REORDER_BUFFER_CHANGE_UPDATE ||
                               1329                 :                :            *action == REORDER_BUFFER_CHANGE_DELETE);
                               1330                 :                : 
                               1331   [ +  +  -  + ]:         182262 :     Assert(new_slot || old_slot);
                               1332                 :                : 
                               1333                 :                :     /* Get the corresponding row filter */
                               1334                 :         182262 :     filter_exprstate = entry->exprstate[map_changetype_pubaction[*action]];
                               1335                 :                : 
                               1336                 :                :     /* Bail out if there is no row filter */
                               1337         [ +  + ]:         182262 :     if (!filter_exprstate)
                               1338                 :         182228 :         return true;
                               1339                 :                : 
                               1340         [ -  + ]:             34 :     elog(DEBUG3, "table \"%s.%s\" has row filter",
                               1341                 :                :          get_namespace_name(RelationGetNamespace(relation)),
                               1342                 :                :          RelationGetRelationName(relation));
                               1343                 :                : 
                               1344         [ +  + ]:             34 :     ResetPerTupleExprContext(entry->estate);
                               1345                 :                : 
                               1346         [ +  + ]:             34 :     ecxt = GetPerTupleExprContext(entry->estate);
                               1347                 :                : 
                               1348                 :                :     /*
                               1349                 :                :      * For the following occasions where there is only one tuple, we can
                               1350                 :                :      * evaluate the row filter for that tuple and return.
                               1351                 :                :      *
                               1352                 :                :      * For inserts, we only have the new tuple.
                               1353                 :                :      *
                               1354                 :                :      * For updates, we can have only a new tuple when none of the replica
                               1355                 :                :      * identity columns changed and none of those columns have external data
                               1356                 :                :      * but we still need to evaluate the row filter for the new tuple as the
                               1357                 :                :      * existing values of those columns might not match the filter. Also,
                               1358                 :                :      * users can use constant expressions in the row filter, so we anyway need
                               1359                 :                :      * to evaluate it for the new tuple.
                               1360                 :                :      *
                               1361                 :                :      * For deletes, we only have the old tuple.
                               1362                 :                :      */
                               1363   [ +  +  +  + ]:             34 :     if (!new_slot || !old_slot)
                               1364                 :                :     {
                               1365         [ +  + ]:             30 :         ecxt->ecxt_scantuple = new_slot ? new_slot : old_slot;
                               1366                 :             30 :         result = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
                               1367                 :                : 
                               1368                 :             30 :         return result;
                               1369                 :                :     }
                               1370                 :                : 
                               1371                 :                :     /*
                               1372                 :                :      * Both the old and new tuples must be valid only for updates and need to
                               1373                 :                :      * be checked against the row filter.
                               1374                 :                :      */
                               1375         [ -  + ]:              4 :     Assert(map_changetype_pubaction[*action] == PUBACTION_UPDATE);
                               1376                 :                : 
                               1377                 :              4 :     slot_getallattrs(new_slot);
                               1378                 :              4 :     slot_getallattrs(old_slot);
                               1379                 :                : 
                               1380                 :              4 :     tmp_new_slot = NULL;
                               1381                 :              4 :     desc = RelationGetDescr(relation);
                               1382                 :                : 
                               1383                 :                :     /*
                               1384                 :                :      * The new tuple might not have all the replica identity columns, in which
                               1385                 :                :      * case it needs to be copied over from the old tuple.
                               1386                 :                :      */
                               1387         [ +  + ]:             12 :     for (i = 0; i < desc->natts; i++)
                               1388                 :                :     {
  450 drowley@postgresql.o     1389                 :              8 :         CompactAttribute *att = TupleDescCompactAttr(desc, i);
                               1390                 :                : 
                               1391                 :                :         /*
                               1392                 :                :          * if the column in the new tuple or old tuple is null, nothing to do
                               1393                 :                :          */
 1482 akapila@postgresql.o     1394   [ +  +  -  + ]:              8 :         if (new_slot->tts_isnull[i] || old_slot->tts_isnull[i])
                               1395                 :              1 :             continue;
                               1396                 :                : 
                               1397                 :                :         /*
                               1398                 :                :          * Unchanged toasted replica identity columns are only logged in the
                               1399                 :                :          * old tuple. Copy this over to the new tuple. The changed (or WAL
                               1400                 :                :          * Logged) toast values are always assembled in memory and set as
                               1401                 :                :          * VARTAG_INDIRECT. See ReorderBufferToastReplace.
                               1402                 :                :          */
                               1403   [ +  +  +  + ]:             11 :         if (att->attlen == -1 &&
  222 peter@eisentraut.org     1404                 :GNC           4 :             VARATT_IS_EXTERNAL_ONDISK(DatumGetPointer(new_slot->tts_values[i])) &&
                               1405         [ +  - ]:              1 :             !VARATT_IS_EXTERNAL_ONDISK(DatumGetPointer(old_slot->tts_values[i])))
                               1406                 :                :         {
 1482 akapila@postgresql.o     1407         [ +  - ]:CBC           1 :             if (!tmp_new_slot)
                               1408                 :                :             {
                               1409                 :              1 :                 tmp_new_slot = MakeSingleTupleTableSlot(desc, &TTSOpsVirtual);
                               1410                 :              1 :                 ExecClearTuple(tmp_new_slot);
                               1411                 :                : 
                               1412                 :              1 :                 memcpy(tmp_new_slot->tts_values, new_slot->tts_values,
                               1413                 :              1 :                        desc->natts * sizeof(Datum));
                               1414                 :              1 :                 memcpy(tmp_new_slot->tts_isnull, new_slot->tts_isnull,
                               1415                 :              1 :                        desc->natts * sizeof(bool));
                               1416                 :                :             }
                               1417                 :                : 
                               1418                 :              1 :             tmp_new_slot->tts_values[i] = old_slot->tts_values[i];
                               1419                 :              1 :             tmp_new_slot->tts_isnull[i] = old_slot->tts_isnull[i];
                               1420                 :                :         }
                               1421                 :                :     }
                               1422                 :                : 
                               1423                 :              4 :     ecxt->ecxt_scantuple = old_slot;
                               1424                 :              4 :     old_matched = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
                               1425                 :                : 
                               1426         [ +  + ]:              4 :     if (tmp_new_slot)
                               1427                 :                :     {
                               1428                 :              1 :         ExecStoreVirtualTuple(tmp_new_slot);
                               1429                 :              1 :         ecxt->ecxt_scantuple = tmp_new_slot;
                               1430                 :                :     }
                               1431                 :                :     else
                               1432                 :              3 :         ecxt->ecxt_scantuple = new_slot;
                               1433                 :                : 
                               1434                 :              4 :     new_matched = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
                               1435                 :                : 
                               1436                 :                :     /*
                               1437                 :                :      * Case 1: if both tuples don't match the row filter, bailout. Send
                               1438                 :                :      * nothing.
                               1439                 :                :      */
                               1440   [ +  +  -  + ]:              4 :     if (!old_matched && !new_matched)
 1482 akapila@postgresql.o     1441                 :UBC           0 :         return false;
                               1442                 :                : 
                               1443                 :                :     /*
                               1444                 :                :      * Case 2: if the old tuple doesn't satisfy the row filter but the new
                               1445                 :                :      * tuple does, transform the UPDATE into INSERT.
                               1446                 :                :      *
                               1447                 :                :      * Use the newly transformed tuple that must contain the column values for
                               1448                 :                :      * all the replica identity columns. This is required to ensure that the
                               1449                 :                :      * while inserting the tuple in the downstream node, we have all the
                               1450                 :                :      * required column values.
                               1451                 :                :      */
 1482 akapila@postgresql.o     1452   [ +  +  +  - ]:CBC           4 :     if (!old_matched && new_matched)
                               1453                 :                :     {
                               1454                 :              2 :         *action = REORDER_BUFFER_CHANGE_INSERT;
                               1455                 :                : 
                               1456         [ +  + ]:              2 :         if (tmp_new_slot)
                               1457                 :              1 :             *new_slot_ptr = tmp_new_slot;
                               1458                 :                :     }
                               1459                 :                : 
                               1460                 :                :     /*
                               1461                 :                :      * Case 3: if the old tuple satisfies the row filter but the new tuple
                               1462                 :                :      * doesn't, transform the UPDATE into DELETE.
                               1463                 :                :      *
                               1464                 :                :      * This transformation does not require another tuple. The Old tuple will
                               1465                 :                :      * be used for DELETE.
                               1466                 :                :      */
                               1467   [ +  -  +  + ]:              2 :     else if (old_matched && !new_matched)
                               1468                 :              1 :         *action = REORDER_BUFFER_CHANGE_DELETE;
                               1469                 :                : 
                               1470                 :                :     /*
                               1471                 :                :      * Case 4: if both tuples match the row filter, transformation isn't
                               1472                 :                :      * required. (*action is default UPDATE).
                               1473                 :                :      */
                               1474                 :                : 
                               1475                 :              4 :     return true;
                               1476                 :                : }
                               1477                 :                : 
                               1478                 :                : /*
                               1479                 :                :  * Sends the decoded DML over wire.
                               1480                 :                :  *
                               1481                 :                :  * This is called both in streaming and non-streaming modes.
                               1482                 :                :  */
                               1483                 :                : static void
 3342 peter_e@gmx.net          1484                 :         186517 : pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
                               1485                 :                :                 Relation relation, ReorderBufferChange *change)
                               1486                 :                : {
 3224 bruce@momjian.us         1487                 :         186517 :     PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
 1446 akapila@postgresql.o     1488                 :         186517 :     PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
                               1489                 :                :     MemoryContext old;
                               1490                 :                :     RelationSyncEntry *relentry;
 2019                          1491                 :         186517 :     TransactionId xid = InvalidTransactionId;
 1888                          1492                 :         186517 :     Relation    ancestor = NULL;
 1482                          1493                 :         186517 :     Relation    targetrel = relation;
                               1494                 :         186517 :     ReorderBufferChangeType action = change->action;
                               1495                 :         186517 :     TupleTableSlot *old_slot = NULL;
                               1496                 :         186517 :     TupleTableSlot *new_slot = NULL;
                               1497                 :                : 
 2942 peter_e@gmx.net          1498         [ -  + ]:         186517 :     if (!is_publishable_relation(relation))
                               1499                 :           4254 :         return;
                               1500                 :                : 
                               1501                 :                :     /*
                               1502                 :                :      * Remember the xid for the change in streaming mode. We need to send xid
                               1503                 :                :      * with each change in the streaming mode so that subscriber can make
                               1504                 :                :      * their association and on aborts, it can discard the corresponding
                               1505                 :                :      * changes.
                               1506                 :                :      */
  899 michael@paquier.xyz      1507         [ +  + ]:         186517 :     if (data->in_streaming)
 2019 akapila@postgresql.o     1508                 :         175903 :         xid = change->txn->xid;
                               1509                 :                : 
 1482                          1510                 :         186517 :     relentry = get_rel_sync_entry(data, relation);
                               1511                 :                : 
                               1512                 :                :     /* First check the table filter */
                               1513   [ +  +  +  - ]:         186516 :     switch (action)
                               1514                 :                :     {
 3342 peter_e@gmx.net          1515                 :         109091 :         case REORDER_BUFFER_CHANGE_INSERT:
                               1516         [ +  + ]:         109091 :             if (!relentry->pubactions.pubinsert)
                               1517                 :           3144 :                 return;
                               1518                 :         105947 :             break;
                               1519                 :          34483 :         case REORDER_BUFFER_CHANGE_UPDATE:
                               1520         [ +  + ]:          34483 :             if (!relentry->pubactions.pubupdate)
                               1521                 :             44 :                 return;
                               1522                 :          34439 :             break;
                               1523                 :          42942 :         case REORDER_BUFFER_CHANGE_DELETE:
                               1524         [ +  + ]:          42942 :             if (!relentry->pubactions.pubdelete)
                               1525                 :           1066 :                 return;
                               1526                 :                : 
                               1527                 :                :             /*
                               1528                 :                :              * This is only possible if deletes are allowed even when replica
                               1529                 :                :              * identity is not defined for a table. Since the DELETE action
                               1530                 :                :              * can't be published, we simply return.
                               1531                 :                :              */
 1081 akapila@postgresql.o     1532         [ -  + ]:          41876 :             if (!change->data.tp.oldtuple)
                               1533                 :                :             {
 1081 akapila@postgresql.o     1534         [ #  # ]:UBC           0 :                 elog(DEBUG1, "didn't send DELETE change because of missing oldtuple");
                               1535                 :              0 :                 return;
                               1536                 :                :             }
 3342 peter_e@gmx.net          1537                 :CBC       41876 :             break;
 3342 peter_e@gmx.net          1538                 :UBC           0 :         default:
                               1539                 :              0 :             Assert(false);
                               1540                 :                :     }
                               1541                 :                : 
                               1542                 :                :     /* Avoid leaking memory by using and resetting our own context */
 3342 peter_e@gmx.net          1543                 :CBC      182262 :     old = MemoryContextSwitchTo(data->context);
                               1544                 :                : 
                               1545                 :                :     /* Switch relation if publishing via root. */
 1081 akapila@postgresql.o     1546         [ +  + ]:         182262 :     if (relentry->publish_as_relid != RelationGetRelid(relation))
                               1547                 :                :     {
                               1548         [ -  + ]:             70 :         Assert(relation->rd_rel->relispartition);
                               1549                 :             70 :         ancestor = RelationIdGetRelation(relentry->publish_as_relid);
                               1550                 :             70 :         targetrel = ancestor;
                               1551                 :                :     }
                               1552                 :                : 
                               1553         [ +  + ]:         182262 :     if (change->data.tp.oldtuple)
                               1554                 :                :     {
                               1555                 :          42014 :         old_slot = relentry->old_slot;
  776 msawada@postgresql.o     1556                 :          42014 :         ExecStoreHeapTuple(change->data.tp.oldtuple, old_slot, false);
                               1557                 :                : 
                               1558                 :                :         /* Convert tuple if needed. */
 1081 akapila@postgresql.o     1559         [ +  + ]:          42014 :         if (relentry->attrmap)
                               1560                 :                :         {
                               1561                 :              5 :             TupleTableSlot *slot = MakeTupleTableSlot(RelationGetDescr(targetrel),
                               1562                 :                :                                                       &TTSOpsVirtual);
                               1563                 :                : 
                               1564                 :              5 :             old_slot = execute_attr_map_slot(relentry->attrmap, old_slot, slot);
                               1565                 :                :         }
                               1566                 :                :     }
                               1567                 :                : 
                               1568         [ +  + ]:         182262 :     if (change->data.tp.newtuple)
                               1569                 :                :     {
                               1570                 :         140386 :         new_slot = relentry->new_slot;
  776 msawada@postgresql.o     1571                 :         140386 :         ExecStoreHeapTuple(change->data.tp.newtuple, new_slot, false);
                               1572                 :                : 
                               1573                 :                :         /* Convert tuple if needed. */
 1081 akapila@postgresql.o     1574         [ +  + ]:         140386 :         if (relentry->attrmap)
                               1575                 :                :         {
                               1576                 :             20 :             TupleTableSlot *slot = MakeTupleTableSlot(RelationGetDescr(targetrel),
                               1577                 :                :                                                       &TTSOpsVirtual);
                               1578                 :                : 
                               1579                 :             20 :             new_slot = execute_attr_map_slot(relentry->attrmap, new_slot, slot);
                               1580                 :                :         }
                               1581                 :                :     }
                               1582                 :                : 
                               1583                 :                :     /*
                               1584                 :                :      * Check row filter.
                               1585                 :                :      *
                               1586                 :                :      * Updates could be transformed to inserts or deletes based on the results
                               1587                 :                :      * of the row filter for old and new tuple.
                               1588                 :                :      */
                               1589         [ +  + ]:         182262 :     if (!pgoutput_row_filter(targetrel, old_slot, &new_slot, relentry, &action))
                               1590                 :             12 :         goto cleanup;
                               1591                 :                : 
                               1592                 :                :     /*
                               1593                 :                :      * Send BEGIN if we haven't yet.
                               1594                 :                :      *
                               1595                 :                :      * We send the BEGIN message after ensuring that we will actually send the
                               1596                 :                :      * change. This avoids sending a pair of BEGIN/COMMIT messages for empty
                               1597                 :                :      * transactions.
                               1598                 :                :      */
                               1599   [ +  +  +  + ]:         182250 :     if (txndata && !txndata->sent_begin_txn)
                               1600                 :            452 :         pgoutput_send_begin(ctx, txn);
                               1601                 :                : 
                               1602                 :                :     /*
                               1603                 :                :      * Schema should be sent using the original relation because it also sends
                               1604                 :                :      * the ancestor's relation.
                               1605                 :                :      */
                               1606                 :         182249 :     maybe_send_schema(ctx, change, relation, relentry);
                               1607                 :                : 
                               1608                 :         182249 :     OutputPluginPrepareWrite(ctx, true);
                               1609                 :                : 
                               1610                 :                :     /* Send the data */
                               1611   [ +  +  +  - ]:         182249 :     switch (action)
                               1612                 :                :     {
                               1613                 :         105936 :         case REORDER_BUFFER_CHANGE_INSERT:
                               1614                 :         105936 :             logicalrep_write_insert(ctx->out, xid, targetrel, new_slot,
  493                          1615                 :         105936 :                                     data->binary, relentry->columns,
                               1616                 :                :                                     relentry->include_gencols_type);
 1081                          1617                 :         105936 :             break;
                               1618                 :          34436 :         case REORDER_BUFFER_CHANGE_UPDATE:
                               1619                 :          34436 :             logicalrep_write_update(ctx->out, xid, targetrel, old_slot,
  493                          1620                 :          34436 :                                     new_slot, data->binary, relentry->columns,
                               1621                 :                :                                     relentry->include_gencols_type);
 1482                          1622                 :          34436 :             break;
 3342 peter_e@gmx.net          1623                 :          41877 :         case REORDER_BUFFER_CHANGE_DELETE:
 1081 akapila@postgresql.o     1624                 :          41877 :             logicalrep_write_delete(ctx->out, xid, targetrel, old_slot,
  493                          1625                 :          41877 :                                     data->binary, relentry->columns,
                               1626                 :                :                                     relentry->include_gencols_type);
 3342 peter_e@gmx.net          1627                 :          41877 :             break;
 3342 peter_e@gmx.net          1628                 :UBC           0 :         default:
                               1629                 :              0 :             Assert(false);
                               1630                 :                :     }
                               1631                 :                : 
 1081 akapila@postgresql.o     1632                 :CBC      182249 :     OutputPluginWrite(ctx, true);
                               1633                 :                : 
                               1634                 :         182261 : cleanup:
 1888                          1635         [ +  + ]:         182261 :     if (RelationIsValid(ancestor))
                               1636                 :                :     {
                               1637                 :             69 :         RelationClose(ancestor);
                               1638                 :             69 :         ancestor = NULL;
                               1639                 :                :     }
                               1640                 :                : 
                               1641                 :                :     /* Drop the new slots that were used to store the converted tuples. */
  626                          1642         [ +  + ]:         182261 :     if (relentry->attrmap)
                               1643                 :                :     {
                               1644         [ +  + ]:             25 :         if (old_slot)
                               1645                 :              5 :             ExecDropSingleTupleTableSlot(old_slot);
                               1646                 :                : 
                               1647         [ +  + ]:             25 :         if (new_slot)
                               1648                 :             20 :             ExecDropSingleTupleTableSlot(new_slot);
                               1649                 :                :     }
                               1650                 :                : 
 3342 peter_e@gmx.net          1651                 :         182261 :     MemoryContextSwitchTo(old);
                               1652                 :         182261 :     MemoryContextReset(data->context);
                               1653                 :                : }
                               1654                 :                : 
                               1655                 :                : static void
 2899                          1656                 :             22 : pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
                               1657                 :                :                   int nrelations, Relation relations[], ReorderBufferChange *change)
                               1658                 :                : {
                               1659                 :             22 :     PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
 1446 akapila@postgresql.o     1660                 :             22 :     PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
                               1661                 :                :     MemoryContext old;
                               1662                 :                :     RelationSyncEntry *relentry;
                               1663                 :                :     int         i;
                               1664                 :                :     int         nrelids;
                               1665                 :                :     Oid        *relids;
 2019                          1666                 :             22 :     TransactionId xid = InvalidTransactionId;
                               1667                 :                : 
                               1668                 :                :     /* Remember the xid for the change in streaming mode. See pgoutput_change. */
  899 michael@paquier.xyz      1669         [ -  + ]:             22 :     if (data->in_streaming)
 2019 akapila@postgresql.o     1670                 :UBC           0 :         xid = change->txn->xid;
                               1671                 :                : 
 2899 peter_e@gmx.net          1672                 :CBC          22 :     old = MemoryContextSwitchTo(data->context);
                               1673                 :                : 
                               1674                 :             22 :     relids = palloc0(nrelations * sizeof(Oid));
                               1675                 :             22 :     nrelids = 0;
                               1676                 :                : 
                               1677         [ +  + ]:             63 :     for (i = 0; i < nrelations; i++)
                               1678                 :                :     {
                               1679                 :             41 :         Relation    relation = relations[i];
                               1680                 :             41 :         Oid         relid = RelationGetRelid(relation);
                               1681                 :                : 
                               1682         [ -  + ]:             41 :         if (!is_publishable_relation(relation))
 2899 peter_e@gmx.net          1683                 :UBC           0 :             continue;
                               1684                 :                : 
 1482 akapila@postgresql.o     1685                 :CBC          41 :         relentry = get_rel_sync_entry(data, relation);
                               1686                 :                : 
 2899 peter_e@gmx.net          1687         [ +  + ]:             41 :         if (!relentry->pubactions.pubtruncate)
                               1688                 :             20 :             continue;
                               1689                 :                : 
                               1690                 :                :         /*
                               1691                 :                :          * Don't send partitions if the publication wants to send only the
                               1692                 :                :          * root tables through it.
                               1693                 :                :          */
 2167 peter@eisentraut.org     1694         [ +  + ]:             21 :         if (relation->rd_rel->relispartition &&
                               1695         [ +  + ]:             15 :             relentry->publish_as_relid != relid)
 2196                          1696                 :              3 :             continue;
                               1697                 :                : 
 2899 peter_e@gmx.net          1698                 :             18 :         relids[nrelids++] = relid;
                               1699                 :                : 
                               1700                 :                :         /* Send BEGIN if we haven't yet */
 1446 akapila@postgresql.o     1701   [ +  -  +  + ]:             18 :         if (txndata && !txndata->sent_begin_txn)
                               1702                 :             12 :             pgoutput_send_begin(ctx, txn);
                               1703                 :                : 
 1683 fujii@postgresql.org     1704                 :             18 :         maybe_send_schema(ctx, change, relation, relentry);
                               1705                 :                :     }
                               1706                 :                : 
 2876 peter_e@gmx.net          1707         [ +  + ]:             22 :     if (nrelids > 0)
                               1708                 :                :     {
                               1709                 :             12 :         OutputPluginPrepareWrite(ctx, true);
                               1710                 :             12 :         logicalrep_write_truncate(ctx->out,
                               1711                 :                :                                   xid,
                               1712                 :                :                                   nrelids,
                               1713                 :                :                                   relids,
                               1714                 :             12 :                                   change->data.truncate.cascade,
                               1715                 :             12 :                                   change->data.truncate.restart_seqs);
                               1716                 :             12 :         OutputPluginWrite(ctx, true);
                               1717                 :                :     }
                               1718                 :                : 
 2899                          1719                 :             22 :     MemoryContextSwitchTo(old);
                               1720                 :             22 :     MemoryContextReset(data->context);
                               1721                 :             22 : }
                               1722                 :                : 
                               1723                 :                : static void
 1804 akapila@postgresql.o     1724                 :              7 : pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
                               1725                 :                :                  XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz,
                               1726                 :                :                  const char *message)
                               1727                 :                : {
                               1728                 :              7 :     PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
                               1729                 :              7 :     TransactionId xid = InvalidTransactionId;
                               1730                 :                : 
                               1731         [ +  + ]:              7 :     if (!data->messages)
                               1732                 :              2 :         return;
                               1733                 :                : 
                               1734                 :                :     /*
                               1735                 :                :      * Remember the xid for the message in streaming mode. See
                               1736                 :                :      * pgoutput_change.
                               1737                 :                :      */
  899 michael@paquier.xyz      1738         [ -  + ]:              5 :     if (data->in_streaming)
 1804 akapila@postgresql.o     1739                 :UBC           0 :         xid = txn->xid;
                               1740                 :                : 
                               1741                 :                :     /*
                               1742                 :                :      * Output BEGIN if we haven't yet. Avoid for non-transactional messages.
                               1743                 :                :      */
 1446 akapila@postgresql.o     1744         [ +  + ]:CBC           5 :     if (transactional)
                               1745                 :                :     {
                               1746                 :              2 :         PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
                               1747                 :                : 
                               1748                 :                :         /* Send BEGIN if we haven't yet */
                               1749   [ +  -  +  - ]:              2 :         if (txndata && !txndata->sent_begin_txn)
                               1750                 :              2 :             pgoutput_send_begin(ctx, txn);
                               1751                 :                :     }
                               1752                 :                : 
 1804                          1753                 :              5 :     OutputPluginPrepareWrite(ctx, true);
                               1754                 :              5 :     logicalrep_write_message(ctx->out,
                               1755                 :                :                              xid,
                               1756                 :                :                              message_lsn,
                               1757                 :                :                              transactional,
                               1758                 :                :                              prefix,
                               1759                 :                :                              sz,
                               1760                 :                :                              message);
                               1761                 :              5 :     OutputPluginWrite(ctx, true);
                               1762                 :                : }
                               1763                 :                : 
                               1764                 :                : /*
                               1765                 :                :  * Return true if the data is associated with an origin and the user has
                               1766                 :                :  * requested the changes that don't have an origin, false otherwise.
                               1767                 :                :  */
                               1768                 :                : static bool
 3342 peter_e@gmx.net          1769                 :         366300 : pgoutput_origin_filter(LogicalDecodingContext *ctx,
                               1770                 :                :                        ReplOriginId origin_id)
                               1771                 :                : {
  900 akapila@postgresql.o     1772                 :         366300 :     PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
                               1773                 :                : 
   46 msawada@postgresql.o     1774   [ +  +  +  + ]:GNC      366300 :     if (data->publish_no_origin && origin_id != InvalidReplOriginId)
 1333 akapila@postgresql.o     1775                 :CBC         182 :         return true;
                               1776                 :                : 
 3342 peter_e@gmx.net          1777                 :         366118 :     return false;
                               1778                 :                : }
                               1779                 :                : 
                               1780                 :                : /*
                               1781                 :                :  * Shutdown the output plugin.
                               1782                 :                :  *
                               1783                 :                :  * Note, we don't need to clean the data->context, data->cachectx, and
                               1784                 :                :  * data->pubctx as they are child contexts of the ctx->context so they
                               1785                 :                :  * will be cleaned up by logical decoding machinery.
                               1786                 :                :  */
                               1787                 :                : static void
 3224 bruce@momjian.us         1788                 :            554 : pgoutput_shutdown(LogicalDecodingContext *ctx)
                               1789                 :                : {
  157 msawada@postgresql.o     1790                 :            554 :     pgoutput_memory_context_reset(NULL);
 3342 peter_e@gmx.net          1791                 :            554 : }
                               1792                 :                : 
                               1793                 :                : /*
                               1794                 :                :  * Load publications from the list of publication names.
                               1795                 :                :  *
                               1796                 :                :  * Here, we skip the publications that don't exist yet. This will allow us
                               1797                 :                :  * to silently continue the replication in the absence of a missing publication.
                               1798                 :                :  * This is required because we allow the users to create publications after they
                               1799                 :                :  * have specified the required publications at the time of replication start.
                               1800                 :                :  */
                               1801                 :                : static List *
                               1802                 :            226 : LoadPublications(List *pubnames)
                               1803                 :                : {
                               1804                 :            226 :     List       *result = NIL;
                               1805                 :                :     ListCell   *lc;
                               1806                 :                : 
 3224 bruce@momjian.us         1807   [ +  -  +  +  :            507 :     foreach(lc, pubnames)
                                              +  + ]
                               1808                 :                :     {
                               1809                 :            281 :         char       *pubname = (char *) lfirst(lc);
  366 akapila@postgresql.o     1810                 :            281 :         Publication *pub = GetPublicationByName(pubname, true);
                               1811                 :                : 
                               1812         [ +  + ]:            281 :         if (pub)
                               1813                 :            278 :             result = lappend(result, pub);
                               1814                 :                :         else
                               1815         [ +  - ]:              3 :             ereport(WARNING,
                               1816                 :                :                     errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                               1817                 :                :                     errmsg("skipped loading publication \"%s\"", pubname),
                               1818                 :                :                     errdetail("The publication does not exist at this point in the WAL."),
                               1819                 :                :                     errhint("Create the publication if it does not exist."));
                               1820                 :                :     }
                               1821                 :                : 
 3342 peter_e@gmx.net          1822                 :            226 :     return result;
                               1823                 :                : }
                               1824                 :                : 
                               1825                 :                : /*
                               1826                 :                :  * Publication syscache invalidation callback.
                               1827                 :                :  *
                               1828                 :                :  * Called for invalidations on pg_publication.
                               1829                 :                :  */
                               1830                 :                : static void
   25 michael@paquier.xyz      1831                 :GNC         361 : publication_invalidation_cb(Datum arg, SysCacheIdentifier cacheid,
                               1832                 :                :                             uint32 hashvalue)
                               1833                 :                : {
 3342 peter_e@gmx.net          1834                 :CBC         361 :     publications_valid = false;
                               1835                 :            361 : }
                               1836                 :                : 
                               1837                 :                : /*
                               1838                 :                :  * START STREAM callback
                               1839                 :                :  */
                               1840                 :                : static void
 2019 akapila@postgresql.o     1841                 :            600 : pgoutput_stream_start(struct LogicalDecodingContext *ctx,
                               1842                 :                :                       ReorderBufferTXN *txn)
                               1843                 :                : {
  899 michael@paquier.xyz      1844                 :            600 :     PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
   46 msawada@postgresql.o     1845                 :GNC         600 :     bool        send_replication_origin = txn->origin_id != InvalidReplOriginId;
                               1846                 :                : 
                               1847                 :                :     /* we can't nest streaming of transactions */
  899 michael@paquier.xyz      1848         [ -  + ]:CBC         600 :     Assert(!data->in_streaming);
                               1849                 :                : 
                               1850                 :                :     /*
                               1851                 :                :      * If we already sent the first stream for this transaction then don't
                               1852                 :                :      * send the origin id in the subsequent streams.
                               1853                 :                :      */
 2019 akapila@postgresql.o     1854         [ +  + ]:            600 :     if (rbtxn_is_streamed(txn))
                               1855                 :            543 :         send_replication_origin = false;
                               1856                 :                : 
                               1857                 :            600 :     OutputPluginPrepareWrite(ctx, !send_replication_origin);
                               1858                 :            600 :     logicalrep_write_stream_start(ctx->out, txn->xid, !rbtxn_is_streamed(txn));
                               1859                 :                : 
 1705                          1860                 :            600 :     send_repl_origin(ctx, txn->origin_id, InvalidXLogRecPtr,
                               1861                 :                :                      send_replication_origin);
                               1862                 :                : 
 2019                          1863                 :            600 :     OutputPluginWrite(ctx, true);
                               1864                 :                : 
                               1865                 :                :     /* we're streaming a chunk of transaction now */
  899 michael@paquier.xyz      1866                 :            600 :     data->in_streaming = true;
 2019 akapila@postgresql.o     1867                 :            600 : }
                               1868                 :                : 
                               1869                 :                : /*
                               1870                 :                :  * STOP STREAM callback
                               1871                 :                :  */
                               1872                 :                : static void
                               1873                 :            600 : pgoutput_stream_stop(struct LogicalDecodingContext *ctx,
                               1874                 :                :                      ReorderBufferTXN *txn)
                               1875                 :                : {
  899 michael@paquier.xyz      1876                 :            600 :     PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
                               1877                 :                : 
                               1878                 :                :     /* we should be streaming a transaction */
                               1879         [ -  + ]:            600 :     Assert(data->in_streaming);
                               1880                 :                : 
 2019 akapila@postgresql.o     1881                 :            600 :     OutputPluginPrepareWrite(ctx, true);
                               1882                 :            600 :     logicalrep_write_stream_stop(ctx->out);
                               1883                 :            600 :     OutputPluginWrite(ctx, true);
                               1884                 :                : 
                               1885                 :                :     /* we've stopped streaming a transaction */
  899 michael@paquier.xyz      1886                 :            600 :     data->in_streaming = false;
 2019 akapila@postgresql.o     1887                 :            600 : }
                               1888                 :                : 
                               1889                 :                : /*
                               1890                 :                :  * Notify downstream to discard the streamed transaction (along with all
                               1891                 :                :  * its subtransactions, if it's a toplevel transaction).
                               1892                 :                :  */
                               1893                 :                : static void
                               1894                 :             26 : pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
                               1895                 :                :                       ReorderBufferTXN *txn,
                               1896                 :                :                       XLogRecPtr abort_lsn)
                               1897                 :                : {
                               1898                 :                :     ReorderBufferTXN *toptxn;
 1161                          1899                 :             26 :     PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
                               1900                 :             26 :     bool        write_abort_info = (data->streaming == LOGICALREP_STREAM_PARALLEL);
                               1901                 :                : 
                               1902                 :                :     /*
                               1903                 :                :      * The abort should happen outside streaming block, even for streamed
                               1904                 :                :      * transactions. The transaction has to be marked as streamed, though.
                               1905                 :                :      */
  899 michael@paquier.xyz      1906         [ -  + ]:             26 :     Assert(!data->in_streaming);
                               1907                 :                : 
                               1908                 :                :     /* determine the toplevel transaction */
 1094 akapila@postgresql.o     1909         [ +  + ]:             26 :     toptxn = rbtxn_get_toptxn(txn);
                               1910                 :                : 
 2019                          1911         [ -  + ]:             26 :     Assert(rbtxn_is_streamed(toptxn));
                               1912                 :                : 
                               1913                 :             26 :     OutputPluginPrepareWrite(ctx, true);
 1161                          1914                 :             26 :     logicalrep_write_stream_abort(ctx->out, toptxn->xid, txn->xid, abort_lsn,
                               1915                 :                :                                   txn->abort_time, write_abort_info);
                               1916                 :                : 
 2019                          1917                 :             26 :     OutputPluginWrite(ctx, true);
                               1918                 :                : 
                               1919                 :             26 :     cleanup_rel_sync_cache(toptxn->xid, false);
                               1920                 :             26 : }
                               1921                 :                : 
                               1922                 :                : /*
                               1923                 :                :  * Notify downstream to apply the streamed transaction (along with all
                               1924                 :                :  * its subtransactions).
                               1925                 :                :  */
                               1926                 :                : static void
                               1927                 :             44 : pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
                               1928                 :                :                        ReorderBufferTXN *txn,
                               1929                 :                :                        XLogRecPtr commit_lsn)
                               1930                 :                : {
  899 michael@paquier.xyz      1931                 :             44 :     PGOutputData *data PG_USED_FOR_ASSERTS_ONLY = (PGOutputData *) ctx->output_plugin_private;
                               1932                 :                : 
                               1933                 :                :     /*
                               1934                 :                :      * The commit should happen outside streaming block, even for streamed
                               1935                 :                :      * transactions. The transaction has to be marked as streamed, though.
                               1936                 :                :      */
                               1937         [ -  + ]:             44 :     Assert(!data->in_streaming);
 2019 akapila@postgresql.o     1938         [ -  + ]:             44 :     Assert(rbtxn_is_streamed(txn));
                               1939                 :                : 
 1131                          1940                 :             44 :     OutputPluginUpdateProgress(ctx, false);
                               1941                 :                : 
 2019                          1942                 :             44 :     OutputPluginPrepareWrite(ctx, true);
                               1943                 :             44 :     logicalrep_write_stream_commit(ctx->out, txn, commit_lsn);
                               1944                 :             44 :     OutputPluginWrite(ctx, true);
                               1945                 :                : 
                               1946                 :             44 :     cleanup_rel_sync_cache(txn->xid, true);
                               1947                 :             44 : }
                               1948                 :                : 
                               1949                 :                : /*
                               1950                 :                :  * PREPARE callback (for streaming two-phase commit).
                               1951                 :                :  *
                               1952                 :                :  * Notify the downstream to prepare the transaction.
                               1953                 :                :  */
                               1954                 :                : static void
 1684                          1955                 :             10 : pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
                               1956                 :                :                             ReorderBufferTXN *txn,
                               1957                 :                :                             XLogRecPtr prepare_lsn)
                               1958                 :                : {
                               1959         [ -  + ]:             10 :     Assert(rbtxn_is_streamed(txn));
                               1960                 :                : 
 1131                          1961                 :             10 :     OutputPluginUpdateProgress(ctx, false);
 1684                          1962                 :             10 :     OutputPluginPrepareWrite(ctx, true);
                               1963                 :             10 :     logicalrep_write_stream_prepare(ctx->out, txn, prepare_lsn);
                               1964                 :             10 :     OutputPluginWrite(ctx, true);
                               1965                 :             10 : }
                               1966                 :                : 
                               1967                 :                : /*
                               1968                 :                :  * Initialize the relation schema sync cache for a decoding session.
                               1969                 :                :  *
                               1970                 :                :  * The hash table is destroyed at the end of a decoding session. While
                               1971                 :                :  * relcache invalidations still exist and will still be invoked, they
                               1972                 :                :  * will just see the null hash table global and take no action.
                               1973                 :                :  */
                               1974                 :                : static void
 3342 peter_e@gmx.net          1975                 :            426 : init_rel_sync_cache(MemoryContext cachectx)
                               1976                 :                : {
                               1977                 :                :     HASHCTL     ctl;
                               1978                 :                :     static bool relation_callbacks_registered = false;
                               1979                 :                : 
                               1980                 :                :     /* Nothing to do if hash table already exists */
                               1981         [ -  + ]:            426 :     if (RelationSyncCache != NULL)
                               1982                 :              2 :         return;
                               1983                 :                : 
                               1984                 :                :     /* Make a new hash table for the cache */
                               1985                 :            426 :     ctl.keysize = sizeof(Oid);
                               1986                 :            426 :     ctl.entrysize = sizeof(RelationSyncEntry);
                               1987                 :            426 :     ctl.hcxt = cachectx;
                               1988                 :                : 
                               1989                 :            426 :     RelationSyncCache = hash_create("logical replication output relation cache",
                               1990                 :                :                                     128, &ctl,
                               1991                 :                :                                     HASH_ELEM | HASH_CONTEXT | HASH_BLOBS);
                               1992                 :                : 
                               1993         [ -  + ]:            426 :     Assert(RelationSyncCache != NULL);
                               1994                 :                : 
                               1995                 :                :     /* No more to do if we already registered callbacks */
 1116 tgl@sss.pgh.pa.us        1996         [ +  + ]:            426 :     if (relation_callbacks_registered)
                               1997                 :              2 :         return;
                               1998                 :                : 
                               1999                 :                :     /* We must update the cache entry for a relation after a relcache flush */
 3342 peter_e@gmx.net          2000                 :            424 :     CacheRegisterRelcacheCallback(rel_sync_cache_relation_cb, (Datum) 0);
                               2001                 :                : 
                               2002                 :                :     /*
                               2003                 :                :      * Flush all cache entries after a pg_namespace change, in case it was a
                               2004                 :                :      * schema rename affecting a relation being replicated.
                               2005                 :                :      *
                               2006                 :                :      * XXX: It is not a good idea to invalidate all the relation entries in
                               2007                 :                :      * RelationSyncCache on schema rename. We can optimize it to invalidate
                               2008                 :                :      * only the required relations by either having a specific invalidation
                               2009                 :                :      * message containing impacted relations or by having schema information
                               2010                 :                :      * in each RelationSyncCache entry and using hashvalue of pg_namespace.oid
                               2011                 :                :      * passed to the callback.
                               2012                 :                :      */
 1164 tgl@sss.pgh.pa.us        2013                 :            424 :     CacheRegisterSyscacheCallback(NAMESPACEOID,
                               2014                 :                :                                   rel_sync_cache_publication_cb,
                               2015                 :                :                                   (Datum) 0);
                               2016                 :                : 
 1116                          2017                 :            424 :     relation_callbacks_registered = true;
                               2018                 :                : }
                               2019                 :                : 
                               2020                 :                : /*
                               2021                 :                :  * We expect relatively small number of streamed transactions.
                               2022                 :                :  */
                               2023                 :                : static bool
 2019 akapila@postgresql.o     2024                 :         175903 : get_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
                               2025                 :                : {
 1350 alvherre@alvh.no-ip.     2026                 :         175903 :     return list_member_xid(entry->streamed_txns, xid);
                               2027                 :                : }
                               2028                 :                : 
                               2029                 :                : /*
                               2030                 :                :  * Add the xid in the rel sync entry for which we have already sent the schema
                               2031                 :                :  * of the relation.
                               2032                 :                :  */
                               2033                 :                : static void
 2019 akapila@postgresql.o     2034                 :             66 : set_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
                               2035                 :                : {
                               2036                 :                :     MemoryContext oldctx;
                               2037                 :                : 
                               2038                 :             66 :     oldctx = MemoryContextSwitchTo(CacheMemoryContext);
                               2039                 :                : 
 1350 alvherre@alvh.no-ip.     2040                 :             66 :     entry->streamed_txns = lappend_xid(entry->streamed_txns, xid);
                               2041                 :                : 
 2019 akapila@postgresql.o     2042                 :             66 :     MemoryContextSwitchTo(oldctx);
                               2043                 :             66 : }
                               2044                 :                : 
                               2045                 :                : /*
                               2046                 :                :  * Find or create entry in the relation schema cache.
                               2047                 :                :  *
                               2048                 :                :  * This looks up publications that the given relation is directly or
                               2049                 :                :  * indirectly part of (the latter if it's really the relation's ancestor that
                               2050                 :                :  * is part of a publication) and fills up the found entry with the information
                               2051                 :                :  * about which operations to publish and whether to use an ancestor's schema
                               2052                 :                :  * when publishing.
                               2053                 :                :  */
                               2054                 :                : static RelationSyncEntry *
 1482                          2055                 :         186558 : get_rel_sync_entry(PGOutputData *data, Relation relation)
                               2056                 :                : {
                               2057                 :                :     RelationSyncEntry *entry;
                               2058                 :                :     bool        found;
                               2059                 :                :     MemoryContext oldctx;
                               2060                 :         186558 :     Oid         relid = RelationGetRelid(relation);
                               2061                 :                : 
 3342 peter_e@gmx.net          2062         [ -  + ]:         186558 :     Assert(RelationSyncCache != NULL);
                               2063                 :                : 
                               2064                 :                :     /* Find cached relation info, creating if not found */
                               2065                 :         186558 :     entry = (RelationSyncEntry *) hash_search(RelationSyncCache,
                               2066                 :                :                                               &relid,
                               2067                 :                :                                               HASH_ENTER, &found);
                               2068         [ -  + ]:         186558 :     Assert(entry != NULL);
                               2069                 :                : 
                               2070                 :                :     /* initialize entry, if it's new */
 2006 akapila@postgresql.o     2071         [ +  + ]:         186558 :     if (!found)
                               2072                 :                :     {
 1500                          2073                 :            327 :         entry->replicate_valid = false;
 2006                          2074                 :            327 :         entry->schema_sent = false;
  416                          2075                 :            327 :         entry->include_gencols_type = PUBLISH_GENCOLS_NONE;
 2006                          2076                 :            327 :         entry->streamed_txns = NIL;
                               2077                 :            327 :         entry->pubactions.pubinsert = entry->pubactions.pubupdate =
 1438 tomas.vondra@postgre     2078                 :            327 :             entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false;
 1482 akapila@postgresql.o     2079                 :            327 :         entry->new_slot = NULL;
                               2080                 :            327 :         entry->old_slot = NULL;
                               2081                 :            327 :         memset(entry->exprstate, 0, sizeof(entry->exprstate));
 1450 tomas.vondra@postgre     2082                 :            327 :         entry->entry_cxt = NULL;
 2006 akapila@postgresql.o     2083                 :            327 :         entry->publish_as_relid = InvalidOid;
 1450 tomas.vondra@postgre     2084                 :            327 :         entry->columns = NULL;
 1482 akapila@postgresql.o     2085                 :            327 :         entry->attrmap = NULL;
                               2086                 :                :     }
                               2087                 :                : 
                               2088                 :                :     /* Validate the entry */
 2006                          2089         [ +  + ]:         186558 :     if (!entry->replicate_valid)
                               2090                 :                :     {
 1600                          2091                 :            423 :         Oid         schemaId = get_rel_namespace(relid);
   11 akapila@postgresql.o     2092                 :GNC         423 :         List       *pubids = GetRelationIncludedPublications(relid);
                               2093                 :                : 
                               2094                 :                :         /*
                               2095                 :                :          * We don't acquire a lock on the namespace system table as we build
                               2096                 :                :          * the cache entry using a historic snapshot and all the later changes
                               2097                 :                :          * are absorbed while decoding WAL.
                               2098                 :                :          */
 1438 tomas.vondra@postgre     2099                 :CBC         423 :         List       *schemaPubids = GetSchemaPublications(schemaId);
                               2100                 :                :         ListCell   *lc;
 2167 peter@eisentraut.org     2101                 :            423 :         Oid         publish_as_relid = relid;
 1460 tomas.vondra@postgre     2102                 :            423 :         int         publish_ancestor_level = 0;
 1530 michael@paquier.xyz      2103                 :            423 :         bool        am_partition = get_rel_relispartition(relid);
 1438 tomas.vondra@postgre     2104                 :            423 :         char        relkind = get_rel_relkind(relid);
 1482 akapila@postgresql.o     2105                 :            423 :         List       *rel_publications = NIL;
                               2106                 :                : 
                               2107                 :                :         /* Reload publications if needed before use. */
 3342 peter_e@gmx.net          2108         [ +  + ]:            423 :         if (!publications_valid)
                               2109                 :                :         {
  461 michael@paquier.xyz      2110                 :            226 :             MemoryContextReset(data->pubctx);
                               2111                 :                : 
                               2112                 :            226 :             oldctx = MemoryContextSwitchTo(data->pubctx);
 3342 peter_e@gmx.net          2113                 :            226 :             data->publications = LoadPublications(data->publication_names);
                               2114                 :            226 :             MemoryContextSwitchTo(oldctx);
                               2115                 :            226 :             publications_valid = true;
                               2116                 :                :         }
                               2117                 :                : 
                               2118                 :                :         /*
                               2119                 :                :          * Reset schema_sent status as the relation definition may have
                               2120                 :                :          * changed.  Also reset pubactions to empty in case rel was dropped
                               2121                 :                :          * from a publication.  Also free any objects that depended on the
                               2122                 :                :          * earlier definition.
                               2123                 :                :          */
 1500 akapila@postgresql.o     2124                 :            423 :         entry->schema_sent = false;
  416                          2125                 :            423 :         entry->include_gencols_type = PUBLISH_GENCOLS_NONE;
 1500                          2126                 :            423 :         list_free(entry->streamed_txns);
                               2127                 :            423 :         entry->streamed_txns = NIL;
 1450 tomas.vondra@postgre     2128                 :            423 :         bms_free(entry->columns);
                               2129                 :            423 :         entry->columns = NULL;
 1500 akapila@postgresql.o     2130                 :            423 :         entry->pubactions.pubinsert = false;
                               2131                 :            423 :         entry->pubactions.pubupdate = false;
                               2132                 :            423 :         entry->pubactions.pubdelete = false;
                               2133                 :            423 :         entry->pubactions.pubtruncate = false;
                               2134                 :                : 
                               2135                 :                :         /*
                               2136                 :                :          * Tuple slots cleanups. (Will be rebuilt later if needed).
                               2137                 :                :          */
 1482                          2138         [ +  + ]:            423 :         if (entry->old_slot)
                               2139                 :                :         {
  479 michael@paquier.xyz      2140                 :             63 :             TupleDesc   desc = entry->old_slot->tts_tupleDescriptor;
                               2141                 :                : 
                               2142         [ -  + ]:             63 :             Assert(desc->tdrefcount == -1);
                               2143                 :                : 
 1482 akapila@postgresql.o     2144                 :             63 :             ExecDropSingleTupleTableSlot(entry->old_slot);
                               2145                 :                : 
                               2146                 :                :             /*
                               2147                 :                :              * ExecDropSingleTupleTableSlot() would not free the TupleDesc, so
                               2148                 :                :              * do it now to avoid any leaks.
                               2149                 :                :              */
  479 michael@paquier.xyz      2150                 :             63 :             FreeTupleDesc(desc);
                               2151                 :                :         }
 1482 akapila@postgresql.o     2152         [ +  + ]:            423 :         if (entry->new_slot)
                               2153                 :                :         {
  479 michael@paquier.xyz      2154                 :             63 :             TupleDesc   desc = entry->new_slot->tts_tupleDescriptor;
                               2155                 :                : 
                               2156         [ -  + ]:             63 :             Assert(desc->tdrefcount == -1);
                               2157                 :                : 
 1482 akapila@postgresql.o     2158                 :             63 :             ExecDropSingleTupleTableSlot(entry->new_slot);
                               2159                 :                : 
                               2160                 :                :             /*
                               2161                 :                :              * ExecDropSingleTupleTableSlot() would not free the TupleDesc, so
                               2162                 :                :              * do it now to avoid any leaks.
                               2163                 :                :              */
  479 michael@paquier.xyz      2164                 :             63 :             FreeTupleDesc(desc);
                               2165                 :                :         }
                               2166                 :                : 
 1482 akapila@postgresql.o     2167                 :            423 :         entry->old_slot = NULL;
                               2168                 :            423 :         entry->new_slot = NULL;
                               2169                 :                : 
                               2170         [ +  + ]:            423 :         if (entry->attrmap)
                               2171                 :              3 :             free_attrmap(entry->attrmap);
                               2172                 :            423 :         entry->attrmap = NULL;
                               2173                 :                : 
                               2174                 :                :         /*
                               2175                 :                :          * Row filter cache cleanups.
                               2176                 :                :          */
 1450 tomas.vondra@postgre     2177         [ +  + ]:            423 :         if (entry->entry_cxt)
                               2178                 :             63 :             MemoryContextDelete(entry->entry_cxt);
                               2179                 :                : 
                               2180                 :            423 :         entry->entry_cxt = NULL;
 1482 akapila@postgresql.o     2181                 :            423 :         entry->estate = NULL;
                               2182                 :            423 :         memset(entry->exprstate, 0, sizeof(entry->exprstate));
                               2183                 :                : 
                               2184                 :                :         /*
                               2185                 :                :          * Build publication cache. We can't use one provided by relcache as
                               2186                 :                :          * relcache considers all publications that the given relation is in,
                               2187                 :                :          * but here we only need to consider ones that the subscriber
                               2188                 :                :          * requested.
                               2189                 :                :          */
 3342 peter_e@gmx.net          2190   [ +  +  +  +  :           1021 :         foreach(lc, data->publications)
                                              +  + ]
                               2191                 :                :         {
                               2192                 :            598 :             Publication *pub = lfirst(lc);
 2167 peter@eisentraut.org     2193                 :            598 :             bool        publish = false;
                               2194                 :                : 
                               2195                 :                :             /*
                               2196                 :                :              * Under what relid should we publish changes in this publication?
                               2197                 :                :              * We'll use the top-most relid across all publications. Also
                               2198                 :                :              * track the ancestor level for this publication.
                               2199                 :                :              */
 1403 tgl@sss.pgh.pa.us        2200                 :            598 :             Oid         pub_relid = relid;
                               2201                 :            598 :             int         ancestor_level = 0;
                               2202                 :                : 
                               2203                 :                :             /*
                               2204                 :                :              * If this is a FOR ALL TABLES publication, pick the partition
                               2205                 :                :              * root and set the ancestor level accordingly.
                               2206                 :                :              */
 1438 tomas.vondra@postgre     2207         [ +  + ]:            598 :             if (pub->alltables)
                               2208                 :                :             {
   11 akapila@postgresql.o     2209                 :GNC         101 :                 List       *exceptpubids = NIL;
                               2210                 :                : 
                               2211         [ +  + ]:            101 :                 if (am_partition)
                               2212                 :                :                 {
 1460 tomas.vondra@postgre     2213                 :CBC          33 :                     List       *ancestors = get_partition_ancestors(relid);
   11 akapila@postgresql.o     2214                 :GNC          33 :                     Oid         last_ancestor_relid = llast_oid(ancestors);
                               2215                 :                : 
                               2216                 :                :                     /*
                               2217                 :                :                      * For a partition, changes are published via top-most
                               2218                 :                :                      * ancestor when pubviaroot is true, so populate pub_relid
                               2219                 :                :                      * accordingly.
                               2220                 :                :                      */
                               2221         [ +  + ]:             33 :                     if (pub->pubviaroot)
                               2222                 :                :                     {
                               2223                 :             19 :                         pub_relid = last_ancestor_relid;
                               2224                 :             19 :                         ancestor_level = list_length(ancestors);
                               2225                 :                :                     }
                               2226                 :                : 
                               2227                 :                :                     /*
                               2228                 :                :                      * Only the top-most ancestor can appear in the EXCEPT
                               2229                 :                :                      * clause. Therefore, for a partition, exclusion must be
                               2230                 :                :                      * evaluated at the top-most ancestor.
                               2231                 :                :                      */
                               2232                 :             33 :                     exceptpubids = GetRelationExcludedPublications(last_ancestor_relid);
                               2233                 :                :                 }
                               2234                 :                :                 else
                               2235                 :                :                 {
                               2236                 :                :                     /*
                               2237                 :                :                      * For a regular table or a root partitioned table, check
                               2238                 :                :                      * exclusion on table itself.
                               2239                 :                :                      */
                               2240                 :             68 :                     exceptpubids = GetRelationExcludedPublications(pub_relid);
                               2241                 :                :                 }
                               2242                 :                : 
                               2243         [ +  + ]:            101 :                 if (!list_member_oid(exceptpubids, pub->oid))
                               2244                 :             86 :                     publish = true;
                               2245                 :                : 
                               2246                 :            101 :                 list_free(exceptpubids);
                               2247                 :                : 
                               2248         [ +  + ]:            101 :                 if (!publish)
                               2249                 :             15 :                     continue;
                               2250                 :                :             }
                               2251                 :                : 
 2167 peter@eisentraut.org     2252         [ +  + ]:CBC         583 :             if (!publish)
                               2253                 :                :             {
 2131 tgl@sss.pgh.pa.us        2254                 :            497 :                 bool        ancestor_published = false;
                               2255                 :                : 
                               2256                 :                :                 /*
                               2257                 :                :                  * For a partition, check if any of the ancestors are
                               2258                 :                :                  * published.  If so, note down the topmost ancestor that is
                               2259                 :                :                  * published via this publication, which will be used as the
                               2260                 :                :                  * relation via which to publish the partition's changes.
                               2261                 :                :                  */
 2167 peter@eisentraut.org     2262         [ +  + ]:            497 :                 if (am_partition)
                               2263                 :                :                 {
                               2264                 :                :                     Oid         ancestor;
                               2265                 :                :                     int         level;
 2131 tgl@sss.pgh.pa.us        2266                 :            121 :                     List       *ancestors = get_partition_ancestors(relid);
                               2267                 :                : 
 1482 akapila@postgresql.o     2268                 :            121 :                     ancestor = GetTopMostAncestorInPublication(pub->oid,
                               2269                 :                :                                                                ancestors,
                               2270                 :                :                                                                &level);
                               2271                 :                : 
                               2272         [ +  + ]:            121 :                     if (ancestor != InvalidOid)
                               2273                 :                :                     {
                               2274                 :             48 :                         ancestor_published = true;
                               2275         [ +  + ]:             48 :                         if (pub->pubviaroot)
                               2276                 :                :                         {
 1460 tomas.vondra@postgre     2277                 :             25 :                             pub_relid = ancestor;
                               2278                 :             25 :                             ancestor_level = level;
                               2279                 :                :                         }
                               2280                 :                :                     }
                               2281                 :                :                 }
                               2282                 :                : 
 1600 akapila@postgresql.o     2283   [ +  +  +  + ]:            780 :                 if (list_member_oid(pubids, pub->oid) ||
                               2284         [ +  + ]:            559 :                     list_member_oid(schemaPubids, pub->oid) ||
                               2285                 :                :                     ancestor_published)
 2167 peter@eisentraut.org     2286                 :            249 :                     publish = true;
                               2287                 :                :             }
                               2288                 :                : 
                               2289                 :                :             /*
                               2290                 :                :              * If the relation is to be published, determine actions to
                               2291                 :                :              * publish, and list of columns, if appropriate.
                               2292                 :                :              *
                               2293                 :                :              * Don't publish changes for partitioned tables, because
                               2294                 :                :              * publishing those of its partitions suffices, unless partition
                               2295                 :                :              * changes won't be published due to pubviaroot being set.
                               2296                 :                :              */
                               2297   [ +  +  +  + ]:            583 :             if (publish &&
                               2298         [ +  + ]:              4 :                 (relkind != RELKIND_PARTITIONED_TABLE || pub->pubviaroot))
                               2299                 :                :             {
 3342 peter_e@gmx.net          2300                 :            332 :                 entry->pubactions.pubinsert |= pub->pubactions.pubinsert;
                               2301                 :            332 :                 entry->pubactions.pubupdate |= pub->pubactions.pubupdate;
                               2302                 :            332 :                 entry->pubactions.pubdelete |= pub->pubactions.pubdelete;
 2899                          2303                 :            332 :                 entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate;
                               2304                 :                : 
                               2305                 :                :                 /*
                               2306                 :                :                  * We want to publish the changes as the top-most ancestor
                               2307                 :                :                  * across all publications. So we need to check if the already
                               2308                 :                :                  * calculated level is higher than the new one. If yes, we can
                               2309                 :                :                  * ignore the new value (as it's a child). Otherwise the new
                               2310                 :                :                  * value is an ancestor, so we keep it.
                               2311                 :                :                  */
 1460 tomas.vondra@postgre     2312         [ +  + ]:            332 :                 if (publish_ancestor_level > ancestor_level)
                               2313                 :              1 :                     continue;
                               2314                 :                : 
                               2315                 :                :                 /*
                               2316                 :                :                  * If we found an ancestor higher up in the tree, discard the
                               2317                 :                :                  * list of publications through which we replicate it, and use
                               2318                 :                :                  * the new ancestor.
                               2319                 :                :                  */
 1459                          2320         [ +  + ]:            331 :                 if (publish_ancestor_level < ancestor_level)
                               2321                 :                :                 {
                               2322                 :             40 :                     publish_as_relid = pub_relid;
                               2323                 :             40 :                     publish_ancestor_level = ancestor_level;
                               2324                 :                : 
                               2325                 :                :                     /* reset the publication list for this relation */
                               2326                 :             40 :                     rel_publications = NIL;
                               2327                 :                :                 }
                               2328                 :                :                 else
                               2329                 :                :                 {
                               2330                 :                :                     /* Same ancestor level, has to be the same OID. */
                               2331         [ -  + ]:            291 :                     Assert(publish_as_relid == pub_relid);
                               2332                 :                :                 }
                               2333                 :                : 
                               2334                 :                :                 /* Track publications for this ancestor. */
                               2335                 :            331 :                 rel_publications = lappend(rel_publications, pub);
                               2336                 :                :             }
                               2337                 :                :         }
                               2338                 :                : 
 1482 akapila@postgresql.o     2339                 :            423 :         entry->publish_as_relid = publish_as_relid;
                               2340                 :                : 
                               2341                 :                :         /*
                               2342                 :                :          * Initialize the tuple slot, map, and row filter. These are only used
                               2343                 :                :          * when publishing inserts, updates, or deletes.
                               2344                 :                :          */
                               2345   [ +  +  +  - ]:            423 :         if (entry->pubactions.pubinsert || entry->pubactions.pubupdate ||
                               2346         [ -  + ]:            105 :             entry->pubactions.pubdelete)
                               2347                 :                :         {
                               2348                 :                :             /* Initialize the tuple slot and map */
                               2349                 :            318 :             init_tuple_slot(data, relation, entry);
                               2350                 :                : 
                               2351                 :                :             /* Initialize the row filter */
                               2352                 :            318 :             pgoutput_row_filter_init(data, rel_publications, entry);
                               2353                 :                : 
                               2354                 :                :             /* Check whether to publish generated columns. */
  493                          2355                 :            318 :             check_and_init_gencol(data, rel_publications, entry);
                               2356                 :                : 
                               2357                 :                :             /* Initialize the column list */
 1450 tomas.vondra@postgre     2358                 :            318 :             pgoutput_column_list_init(data, rel_publications, entry);
                               2359                 :                :         }
                               2360                 :                : 
 3342 peter_e@gmx.net          2361                 :            422 :         list_free(pubids);
 1500 akapila@postgresql.o     2362                 :            422 :         list_free(schemaPubids);
 1482                          2363                 :            422 :         list_free(rel_publications);
                               2364                 :                : 
 3342 peter_e@gmx.net          2365                 :            422 :         entry->replicate_valid = true;
                               2366                 :                :     }
                               2367                 :                : 
                               2368                 :         186557 :     return entry;
                               2369                 :                : }
                               2370                 :                : 
                               2371                 :                : /*
                               2372                 :                :  * Cleanup list of streamed transactions and update the schema_sent flag.
                               2373                 :                :  *
                               2374                 :                :  * When a streamed transaction commits or aborts, we need to remove the
                               2375                 :                :  * toplevel XID from the schema cache. If the transaction aborted, the
                               2376                 :                :  * subscriber will simply throw away the schema records we streamed, so
                               2377                 :                :  * we don't need to do anything else.
                               2378                 :                :  *
                               2379                 :                :  * If the transaction is committed, the subscriber will update the relation
                               2380                 :                :  * cache - so tweak the schema_sent flag accordingly.
                               2381                 :                :  */
                               2382                 :                : static void
 2019 akapila@postgresql.o     2383                 :             70 : cleanup_rel_sync_cache(TransactionId xid, bool is_commit)
                               2384                 :                : {
                               2385                 :                :     HASH_SEQ_STATUS hash_seq;
                               2386                 :                :     RelationSyncEntry *entry;
                               2387                 :                : 
                               2388         [ -  + ]:             70 :     Assert(RelationSyncCache != NULL);
                               2389                 :                : 
                               2390                 :             70 :     hash_seq_init(&hash_seq, RelationSyncCache);
                               2391         [ +  + ]:            143 :     while ((entry = hash_seq_search(&hash_seq)) != NULL)
                               2392                 :                :     {
                               2393                 :                :         /*
                               2394                 :                :          * We can set the schema_sent flag for an entry that has committed xid
                               2395                 :                :          * in the list as that ensures that the subscriber would have the
                               2396                 :                :          * corresponding schema and we don't need to send it unless there is
                               2397                 :                :          * any invalidation for that relation.
                               2398                 :                :          */
  801 nathan@postgresql.or     2399   [ +  +  +  -  :            164 :         foreach_xid(streamed_txn, entry->streamed_txns)
                                              +  + ]
                               2400                 :                :         {
                               2401         [ +  + ]:             70 :             if (xid == streamed_txn)
                               2402                 :                :             {
 2019 akapila@postgresql.o     2403         [ +  + ]:             52 :                 if (is_commit)
                               2404                 :             41 :                     entry->schema_sent = true;
                               2405                 :                : 
                               2406                 :             52 :                 entry->streamed_txns =
  801 nathan@postgresql.or     2407                 :             52 :                     foreach_delete_current(entry->streamed_txns, streamed_txn);
 2019 akapila@postgresql.o     2408                 :             52 :                 break;
                               2409                 :                :             }
                               2410                 :                :         }
                               2411                 :                :     }
                               2412                 :             70 : }
                               2413                 :                : 
                               2414                 :                : /*
                               2415                 :                :  * Relcache invalidation callback
                               2416                 :                :  */
                               2417                 :                : static void
 3342 peter_e@gmx.net          2418                 :           4174 : rel_sync_cache_relation_cb(Datum arg, Oid relid)
                               2419                 :                : {
                               2420                 :                :     RelationSyncEntry *entry;
                               2421                 :                : 
                               2422                 :                :     /*
                               2423                 :                :      * We can get here if the plugin was used in SQL interface as the
                               2424                 :                :      * RelationSyncCache is destroyed when the decoding finishes, but there is
                               2425                 :                :      * no way to unregister the relcache invalidation callback.
                               2426                 :                :      */
                               2427         [ +  + ]:           4174 :     if (RelationSyncCache == NULL)
                               2428                 :             32 :         return;
                               2429                 :                : 
                               2430                 :                :     /*
                               2431                 :                :      * Nobody keeps pointers to entries in this hash table around outside
                               2432                 :                :      * logical decoding callback calls - but invalidation events can come in
                               2433                 :                :      * *during* a callback if we do any syscache access in the callback.
                               2434                 :                :      * Because of that we must mark the cache entry as invalid but not damage
                               2435                 :                :      * any of its substructure here.  The next get_rel_sync_entry() call will
                               2436                 :                :      * rebuild it all.
                               2437                 :                :      */
 1500 akapila@postgresql.o     2438         [ +  + ]:           4142 :     if (OidIsValid(relid))
                               2439                 :                :     {
                               2440                 :                :         /*
                               2441                 :                :          * Getting invalidations for relations that aren't in the table is
                               2442                 :                :          * entirely normal.  So we don't care if it's found or not.
                               2443                 :                :          */
                               2444                 :           4073 :         entry = (RelationSyncEntry *) hash_search(RelationSyncCache, &relid,
                               2445                 :                :                                                   HASH_FIND, NULL);
                               2446         [ +  + ]:           4073 :         if (entry != NULL)
                               2447                 :            715 :             entry->replicate_valid = false;
                               2448                 :                :     }
                               2449                 :                :     else
                               2450                 :                :     {
                               2451                 :                :         /* Whole cache must be flushed. */
                               2452                 :                :         HASH_SEQ_STATUS status;
                               2453                 :                : 
                               2454                 :             69 :         hash_seq_init(&status, RelationSyncCache);
                               2455         [ +  + ]:            138 :         while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
                               2456                 :                :         {
                               2457                 :             69 :             entry->replicate_valid = false;
                               2458                 :                :         }
                               2459                 :                :     }
                               2460                 :                : }
                               2461                 :                : 
                               2462                 :                : /*
                               2463                 :                :  * Publication relation/schema map syscache invalidation callback
                               2464                 :                :  *
                               2465                 :                :  * Called for invalidations on pg_namespace.
                               2466                 :                :  */
                               2467                 :                : static void
   25 michael@paquier.xyz      2468                 :GNC          41 : rel_sync_cache_publication_cb(Datum arg, SysCacheIdentifier cacheid,
                               2469                 :                :                               uint32 hashvalue)
                               2470                 :                : {
                               2471                 :                :     HASH_SEQ_STATUS status;
                               2472                 :                :     RelationSyncEntry *entry;
                               2473                 :                : 
                               2474                 :                :     /*
                               2475                 :                :      * We can get here if the plugin was used in SQL interface as the
                               2476                 :                :      * RelationSyncCache is destroyed when the decoding finishes, but there is
                               2477                 :                :      * no way to unregister the invalidation callbacks.
                               2478                 :                :      */
 3342 peter_e@gmx.net          2479         [ +  + ]:CBC          41 :     if (RelationSyncCache == NULL)
                               2480                 :             13 :         return;
                               2481                 :                : 
                               2482                 :                :     /*
                               2483                 :                :      * We have no easy way to identify which cache entries this invalidation
                               2484                 :                :      * event might have affected, so just mark them all invalid.
                               2485                 :                :      */
                               2486                 :             28 :     hash_seq_init(&status, RelationSyncCache);
                               2487         [ +  + ]:             49 :     while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
                               2488                 :                :     {
                               2489                 :             21 :         entry->replicate_valid = false;
                               2490                 :                :     }
                               2491                 :                : }
                               2492                 :                : 
                               2493                 :                : /* Send Replication origin */
                               2494                 :                : static void
   46 msawada@postgresql.o     2495                 :GNC        1083 : send_repl_origin(LogicalDecodingContext *ctx, ReplOriginId origin_id,
                               2496                 :                :                  XLogRecPtr origin_lsn, bool send_origin)
                               2497                 :                : {
 1705 akapila@postgresql.o     2498         [ +  + ]:CBC        1083 :     if (send_origin)
                               2499                 :                :     {
                               2500                 :                :         char       *origin;
                               2501                 :                : 
                               2502                 :                :         /*----------
                               2503                 :                :          * XXX: which behaviour do we want here?
                               2504                 :                :          *
                               2505                 :                :          * Alternatives:
                               2506                 :                :          *  - don't send origin message if origin name not found
                               2507                 :                :          *    (that's what we do now)
                               2508                 :                :          *  - throw error - that will break replication, not good
                               2509                 :                :          *  - send some special "unknown" origin
                               2510                 :                :          *----------
                               2511                 :                :          */
                               2512         [ +  - ]:              9 :         if (replorigin_by_oid(origin_id, true, &origin))
                               2513                 :                :         {
                               2514                 :                :             /* Message boundary */
                               2515                 :              9 :             OutputPluginWrite(ctx, false);
                               2516                 :              9 :             OutputPluginPrepareWrite(ctx, true);
                               2517                 :                : 
                               2518                 :              9 :             logicalrep_write_origin(ctx->out, origin, origin_lsn);
                               2519                 :                :         }
                               2520                 :                :     }
                               2521                 :           1083 : }
        

Generated by: LCOV version 2.4-beta