LCOV - differential code coverage report
Current view: top level - src/backend/commands - publicationcmds.c (source / functions) Coverage Total Hit UNC UIC UBC GBC GNC CBC DUB DCB
Current: 0e5ff9b9b45a657aea12440478dc002e9b01f138 vs 0123ce131fca454009439dfa3b2266d1d40737d7 Lines: 92.6 % 726 672 5 49 49 623 2 21
Current Date: 2026-03-14 14:10:32 -0400 Functions: 96.9 % 32 31 1 12 19
Baseline: lcov-20260315-024220-baseline Branches: 77.7 % 611 475 33 1 102 1 49 425 14 26
Baseline Date: 2026-03-14 15:27:56 +0100 Line coverage date bins:
Legend: Lines:     hit not hit
Branches: + taken - not taken # not executed
(7,30] days: 100.0 % 21 21 21
(30,360] days: 87.8 % 41 36 5 28 8
(360..) days: 92.6 % 664 615 49 615
Function coverage date bins:
(360..) days: 96.9 % 32 31 1 12 19
Branch coverage date bins:
(7,30] days: 75.0 % 4 3 1 3
(30,360] days: 61.9 % 84 52 32 46 6
(360..) days: 80.3 % 523 420 1 102 1 419

 Age         Owner                    Branch data    TLA  Line data    Source code
                                  1                 :                : /*-------------------------------------------------------------------------
                                  2                 :                :  *
                                  3                 :                :  * publicationcmds.c
                                  4                 :                :  *      publication manipulation
                                  5                 :                :  *
                                  6                 :                :  * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
                                  7                 :                :  * Portions Copyright (c) 1994, Regents of the University of California
                                  8                 :                :  *
                                  9                 :                :  * IDENTIFICATION
                                 10                 :                :  *      src/backend/commands/publicationcmds.c
                                 11                 :                :  *
                                 12                 :                :  *-------------------------------------------------------------------------
                                 13                 :                :  */
                                 14                 :                : 
                                 15                 :                : #include "postgres.h"
                                 16                 :                : 
                                 17                 :                : #include "access/htup_details.h"
                                 18                 :                : #include "access/table.h"
                                 19                 :                : #include "access/xact.h"
                                 20                 :                : #include "catalog/catalog.h"
                                 21                 :                : #include "catalog/indexing.h"
                                 22                 :                : #include "catalog/namespace.h"
                                 23                 :                : #include "catalog/objectaccess.h"
                                 24                 :                : #include "catalog/objectaddress.h"
                                 25                 :                : #include "catalog/pg_database.h"
                                 26                 :                : #include "catalog/pg_inherits.h"
                                 27                 :                : #include "catalog/pg_namespace.h"
                                 28                 :                : #include "catalog/pg_proc.h"
                                 29                 :                : #include "catalog/pg_publication.h"
                                 30                 :                : #include "catalog/pg_publication_namespace.h"
                                 31                 :                : #include "catalog/pg_publication_rel.h"
                                 32                 :                : #include "commands/defrem.h"
                                 33                 :                : #include "commands/event_trigger.h"
                                 34                 :                : #include "commands/publicationcmds.h"
                                 35                 :                : #include "miscadmin.h"
                                 36                 :                : #include "nodes/nodeFuncs.h"
                                 37                 :                : #include "parser/parse_clause.h"
                                 38                 :                : #include "parser/parse_collate.h"
                                 39                 :                : #include "parser/parse_relation.h"
                                 40                 :                : #include "rewrite/rewriteHandler.h"
                                 41                 :                : #include "storage/lmgr.h"
                                 42                 :                : #include "utils/acl.h"
                                 43                 :                : #include "utils/builtins.h"
                                 44                 :                : #include "utils/inval.h"
                                 45                 :                : #include "utils/lsyscache.h"
                                 46                 :                : #include "utils/rel.h"
                                 47                 :                : #include "utils/syscache.h"
                                 48                 :                : #include "utils/varlena.h"
                                 49                 :                : 
                                 50                 :                : 
                                 51                 :                : /*
                                 52                 :                :  * Information used to validate the columns in the row filter expression. See
                                 53                 :                :  * contain_invalid_rfcolumn_walker for details.
                                 54                 :                :  */
                                 55                 :                : typedef struct rf_context
                                 56                 :                : {
                                 57                 :                :     Bitmapset  *bms_replident;  /* bitset of replica identity columns */
                                 58                 :                :     bool        pubviaroot;     /* true if we are validating the parent
                                 59                 :                :                                  * relation's row filter */
                                 60                 :                :     Oid         relid;          /* relid of the relation */
                                 61                 :                :     Oid         parentid;       /* relid of the parent relation */
                                 62                 :                : } rf_context;
                                 63                 :                : 
                                 64                 :                : static List *OpenTableList(List *tables);
                                 65                 :                : static void CloseTableList(List *rels);
                                 66                 :                : static void LockSchemaList(List *schemalist);
                                 67                 :                : static void PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
                                 68                 :                :                                  AlterPublicationStmt *stmt);
                                 69                 :                : static void PublicationDropTables(Oid pubid, List *rels, bool missing_ok);
                                 70                 :                : static void PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists,
                                 71                 :                :                                   AlterPublicationStmt *stmt);
                                 72                 :                : static void PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok);
                                 73                 :                : static char defGetGeneratedColsOption(DefElem *def);
                                 74                 :                : 
                                 75                 :                : 
                                 76                 :                : static void
 1704 dean.a.rasheed@gmail       77                 :CBC         537 : parse_publication_options(ParseState *pstate,
                                 78                 :                :                           List *options,
                                 79                 :                :                           bool *publish_given,
                                 80                 :                :                           PublicationActions *pubactions,
                                 81                 :                :                           bool *publish_via_partition_root_given,
                                 82                 :                :                           bool *publish_via_partition_root,
                                 83                 :                :                           bool *publish_generated_columns_given,
                                 84                 :                :                           char *publish_generated_columns)
                                 85                 :                : {
                                 86                 :                :     ListCell   *lc;
                                 87                 :                : 
 3229 peter_e@gmx.net            88                 :            537 :     *publish_given = false;
 2167 peter@eisentraut.org       89                 :            537 :     *publish_via_partition_root_given = false;
  493 akapila@postgresql.o       90                 :            537 :     *publish_generated_columns_given = false;
                                 91                 :                : 
                                 92                 :                :     /* defaults */
 2167 peter@eisentraut.org       93                 :            537 :     pubactions->pubinsert = true;
                                 94                 :            537 :     pubactions->pubupdate = true;
                                 95                 :            537 :     pubactions->pubdelete = true;
                                 96                 :            537 :     pubactions->pubtruncate = true;
                                 97                 :            537 :     *publish_via_partition_root = false;
  416 akapila@postgresql.o       98                 :            537 :     *publish_generated_columns = PUBLISH_GENCOLS_NONE;
                                 99                 :                : 
                                100                 :                :     /* Parse options */
 3224 bruce@momjian.us          101   [ +  +  +  +  :            721 :     foreach(lc, options)
                                              +  + ]
                                102                 :                :     {
 3342 peter_e@gmx.net           103                 :            202 :         DefElem    *defel = (DefElem *) lfirst(lc);
                                104                 :                : 
 3229                           105         [ +  + ]:            202 :         if (strcmp(defel->defname, "publish") == 0)
                                106                 :                :         {
                                107                 :                :             char       *publish;
                                108                 :                :             List       *publish_list;
                                109                 :                :             ListCell   *lc2;
                                110                 :                : 
                                111         [ -  + ]:             70 :             if (*publish_given)
 1704 dean.a.rasheed@gmail      112                 :UBC           0 :                 errorConflictingDefElem(defel, pstate);
                                113                 :                : 
                                114                 :                :             /*
                                115                 :                :              * If publish option was given only the explicitly listed actions
                                116                 :                :              * should be published.
                                117                 :                :              */
 2167 peter@eisentraut.org      118                 :CBC          70 :             pubactions->pubinsert = false;
                                119                 :             70 :             pubactions->pubupdate = false;
                                120                 :             70 :             pubactions->pubdelete = false;
                                121                 :             70 :             pubactions->pubtruncate = false;
                                122                 :                : 
 3229 peter_e@gmx.net           123                 :             70 :             *publish_given = true;
                                124                 :                : 
                                125                 :                :             /*
                                126                 :                :              * SplitIdentifierString destructively modifies its input, so make
                                127                 :                :              * a copy so we don't modify the memory of the executing statement
                                128                 :                :              */
   68 drowley@postgresql.o      129                 :             70 :             publish = pstrdup(defGetString(defel));
                                130                 :                : 
 3229 peter_e@gmx.net           131         [ -  + ]:             70 :             if (!SplitIdentifierString(publish, ',', &publish_list))
 3342 peter_e@gmx.net           132         [ #  # ]:UBC           0 :                 ereport(ERROR,
                                133                 :                :                         (errcode(ERRCODE_SYNTAX_ERROR),
                                134                 :                :                          errmsg("invalid list syntax in parameter \"%s\"",
                                135                 :                :                                 "publish")));
                                136                 :                : 
                                137                 :                :             /* Process the option list. */
 1257 drowley@postgresql.o      138   [ +  +  +  +  :CBC         167 :             foreach(lc2, publish_list)
                                              +  + ]
                                139                 :                :             {
                                140                 :            100 :                 char       *publish_opt = (char *) lfirst(lc2);
                                141                 :                : 
 3229 peter_e@gmx.net           142         [ +  + ]:            100 :                 if (strcmp(publish_opt, "insert") == 0)
 2167 peter@eisentraut.org      143                 :             62 :                     pubactions->pubinsert = true;
 3229 peter_e@gmx.net           144         [ +  + ]:             38 :                 else if (strcmp(publish_opt, "update") == 0)
 2167 peter@eisentraut.org      145                 :             15 :                     pubactions->pubupdate = true;
 3229 peter_e@gmx.net           146         [ +  + ]:             23 :                 else if (strcmp(publish_opt, "delete") == 0)
 2167 peter@eisentraut.org      147                 :             10 :                     pubactions->pubdelete = true;
 2899 peter_e@gmx.net           148         [ +  + ]:             13 :                 else if (strcmp(publish_opt, "truncate") == 0)
 2167 peter@eisentraut.org      149                 :             10 :                     pubactions->pubtruncate = true;
                                150                 :                :                 else
 3229 peter_e@gmx.net           151         [ +  - ]:              3 :                     ereport(ERROR,
                                152                 :                :                             (errcode(ERRCODE_SYNTAX_ERROR),
                                153                 :                :                              errmsg("unrecognized value for publication option \"%s\": \"%s\"",
                                154                 :                :                                     "publish", publish_opt)));
                                155                 :                :             }
                                156                 :                :         }
 2167 peter@eisentraut.org      157         [ +  + ]:            132 :         else if (strcmp(defel->defname, "publish_via_partition_root") == 0)
                                158                 :                :         {
                                159         [ +  + ]:             88 :             if (*publish_via_partition_root_given)
 1704 dean.a.rasheed@gmail      160                 :              3 :                 errorConflictingDefElem(defel, pstate);
 2167 peter@eisentraut.org      161                 :             85 :             *publish_via_partition_root_given = true;
                                162                 :             85 :             *publish_via_partition_root = defGetBoolean(defel);
                                163                 :                :         }
  493 akapila@postgresql.o      164         [ +  + ]:             44 :         else if (strcmp(defel->defname, "publish_generated_columns") == 0)
                                165                 :                :         {
                                166         [ +  + ]:             41 :             if (*publish_generated_columns_given)
                                167                 :              3 :                 errorConflictingDefElem(defel, pstate);
                                168                 :             38 :             *publish_generated_columns_given = true;
  416                           169                 :             38 :             *publish_generated_columns = defGetGeneratedColsOption(defel);
                                170                 :                :         }
                                171                 :                :         else
 3229 peter_e@gmx.net           172         [ +  - ]:              3 :             ereport(ERROR,
                                173                 :                :                     (errcode(ERRCODE_SYNTAX_ERROR),
                                174                 :                :                      errmsg("unrecognized publication parameter: \"%s\"", defel->defname)));
                                175                 :                :     }
 3342                           176                 :            519 : }
                                177                 :                : 
                                178                 :                : /*
                                179                 :                :  * Convert the PublicationObjSpecType list into schema oid list and
                                180                 :                :  * PublicationTable list.
                                181                 :                :  */
                                182                 :                : static void
 1600 akapila@postgresql.o      183                 :            944 : ObjectsInPublicationToOids(List *pubobjspec_list, ParseState *pstate,
                                184                 :                :                            List **rels, List **exceptrels, List **schemas)
                                185                 :                : {
                                186                 :                :     ListCell   *cell;
                                187                 :                :     PublicationObjSpec *pubobj;
                                188                 :                : 
                                189         [ +  + ]:            944 :     if (!pubobjspec_list)
                                190                 :            116 :         return;
                                191                 :                : 
                                192   [ +  -  +  +  :           1760 :     foreach(cell, pubobjspec_list)
                                              +  + ]
                                193                 :                :     {
                                194                 :                :         Oid         schemaid;
                                195                 :                :         List       *search_path;
                                196                 :                : 
                                197                 :            950 :         pubobj = (PublicationObjSpec *) lfirst(cell);
                                198                 :                : 
                                199   [ +  +  +  +  :            950 :         switch (pubobj->pubobjtype)
                                                 - ]
                                200                 :                :         {
   11 akapila@postgresql.o      201                 :GNC          34 :             case PUBLICATIONOBJ_EXCEPT_TABLE:
                                202                 :             34 :                 pubobj->pubtable->except = true;
                                203                 :             34 :                 *exceptrels = lappend(*exceptrels, pubobj->pubtable);
                                204                 :             34 :                 break;
 1600 akapila@postgresql.o      205                 :CBC         718 :             case PUBLICATIONOBJ_TABLE:
   11 akapila@postgresql.o      206                 :GNC         718 :                 pubobj->pubtable->except = false;
 1438 tomas.vondra@postgre      207                 :CBC         718 :                 *rels = lappend(*rels, pubobj->pubtable);
 1600 akapila@postgresql.o      208                 :            718 :                 break;
 1536 alvherre@alvh.no-ip.      209                 :            186 :             case PUBLICATIONOBJ_TABLES_IN_SCHEMA:
 1600 akapila@postgresql.o      210                 :            186 :                 schemaid = get_namespace_oid(pubobj->name, false);
                                211                 :                : 
                                212                 :                :                 /* Filter out duplicates if user specifies "sch1, sch1" */
 1438 tomas.vondra@postgre      213                 :            171 :                 *schemas = list_append_unique_oid(*schemas, schemaid);
 1600 akapila@postgresql.o      214                 :            171 :                 break;
 1536 alvherre@alvh.no-ip.      215                 :             12 :             case PUBLICATIONOBJ_TABLES_IN_CUR_SCHEMA:
 1600 akapila@postgresql.o      216                 :             12 :                 search_path = fetch_search_path(false);
                                217         [ +  + ]:             12 :                 if (search_path == NIL) /* nothing valid in search_path? */
                                218         [ +  - ]:              3 :                     ereport(ERROR,
                                219                 :                :                             errcode(ERRCODE_UNDEFINED_SCHEMA),
                                220                 :                :                             errmsg("no schema has been selected for CURRENT_SCHEMA"));
                                221                 :                : 
                                222                 :              9 :                 schemaid = linitial_oid(search_path);
                                223                 :              9 :                 list_free(search_path);
                                224                 :                : 
                                225                 :                :                 /* Filter out duplicates if user specifies "sch1, sch1" */
 1438 tomas.vondra@postgre      226                 :              9 :                 *schemas = list_append_unique_oid(*schemas, schemaid);
 1600 akapila@postgresql.o      227                 :              9 :                 break;
 1600 akapila@postgresql.o      228                 :UBC           0 :             default:
                                229                 :                :                 /* shouldn't happen */
                                230         [ #  # ]:              0 :                 elog(ERROR, "invalid publication object type %d", pubobj->pubobjtype);
                                231                 :                :                 break;
                                232                 :                :         }
                                233                 :                :     }
                                234                 :                : }
                                235                 :                : 
                                236                 :                : /*
                                237                 :                :  * Returns true if any of the columns used in the row filter WHERE expression is
                                238                 :                :  * not part of REPLICA IDENTITY, false otherwise.
                                239                 :                :  */
                                240                 :                : static bool
 1482 akapila@postgresql.o      241                 :CBC         129 : contain_invalid_rfcolumn_walker(Node *node, rf_context *context)
                                242                 :                : {
                                243         [ -  + ]:            129 :     if (node == NULL)
 1482 akapila@postgresql.o      244                 :UBC           0 :         return false;
                                245                 :                : 
 1482 akapila@postgresql.o      246         [ +  + ]:CBC         129 :     if (IsA(node, Var))
                                247                 :                :     {
                                248                 :             52 :         Var        *var = (Var *) node;
                                249                 :             52 :         AttrNumber  attnum = var->varattno;
                                250                 :                : 
                                251                 :                :         /*
                                252                 :                :          * If pubviaroot is true, we are validating the row filter of the
                                253                 :                :          * parent table, but the bitmap contains the replica identity
                                254                 :                :          * information of the child table. So, get the column number of the
                                255                 :                :          * child table as parent and child column order could be different.
                                256                 :                :          */
                                257         [ +  + ]:             52 :         if (context->pubviaroot)
                                258                 :                :         {
                                259                 :              8 :             char       *colname = get_attname(context->parentid, attnum, false);
                                260                 :                : 
                                261                 :              8 :             attnum = get_attnum(context->relid, colname);
                                262                 :                :         }
                                263                 :                : 
                                264         [ +  + ]:             52 :         if (!bms_is_member(attnum - FirstLowInvalidHeapAttributeNumber,
                                265                 :             52 :                            context->bms_replident))
                                266                 :             30 :             return true;
                                267                 :                :     }
                                268                 :                : 
                                269                 :             99 :     return expression_tree_walker(node, contain_invalid_rfcolumn_walker,
                                270                 :                :                                   context);
                                271                 :                : }
                                272                 :                : 
                                273                 :                : /*
                                274                 :                :  * Check if all columns referenced in the filter expression are part of the
                                275                 :                :  * REPLICA IDENTITY index or not.
                                276                 :                :  *
                                277                 :                :  * Returns true if any invalid column is found.
                                278                 :                :  */
                                279                 :                : bool
 1450 tomas.vondra@postgre      280                 :            364 : pub_rf_contains_invalid_column(Oid pubid, Relation relation, List *ancestors,
                                281                 :                :                                bool pubviaroot)
                                282                 :                : {
                                283                 :                :     HeapTuple   rftuple;
 1482 akapila@postgresql.o      284                 :            364 :     Oid         relid = RelationGetRelid(relation);
                                285                 :            364 :     Oid         publish_as_relid = RelationGetRelid(relation);
                                286                 :            364 :     bool        result = false;
                                287                 :                :     Datum       rfdatum;
                                288                 :                :     bool        rfisnull;
                                289                 :                : 
                                290                 :                :     /*
                                291                 :                :      * FULL means all columns are in the REPLICA IDENTITY, so all columns are
                                292                 :                :      * allowed in the row filter and we can skip the validation.
                                293                 :                :      */
                                294         [ +  + ]:            364 :     if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
                                295                 :             99 :         return false;
                                296                 :                : 
                                297                 :                :     /*
                                298                 :                :      * For a partition, if pubviaroot is true, find the topmost ancestor that
                                299                 :                :      * is published via this publication as we need to use its row filter
                                300                 :                :      * expression to filter the partition's changes.
                                301                 :                :      *
                                302                 :                :      * Note that even though the row filter used is for an ancestor, the
                                303                 :                :      * REPLICA IDENTITY used will be for the actual child table.
                                304                 :                :      */
                                305   [ +  +  +  + ]:            265 :     if (pubviaroot && relation->rd_rel->relispartition)
                                306                 :                :     {
                                307                 :                :         publish_as_relid
 1460 tomas.vondra@postgre      308                 :             54 :             = GetTopMostAncestorInPublication(pubid, ancestors, NULL);
                                309                 :                : 
 1482 akapila@postgresql.o      310         [ +  + ]:             54 :         if (!OidIsValid(publish_as_relid))
                                311                 :              3 :             publish_as_relid = relid;
                                312                 :                :     }
                                313                 :                : 
                                314                 :            265 :     rftuple = SearchSysCache2(PUBLICATIONRELMAP,
                                315                 :                :                               ObjectIdGetDatum(publish_as_relid),
                                316                 :                :                               ObjectIdGetDatum(pubid));
                                317                 :                : 
                                318         [ +  + ]:            265 :     if (!HeapTupleIsValid(rftuple))
                                319                 :             28 :         return false;
                                320                 :                : 
                                321                 :            237 :     rfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
                                322                 :                :                               Anum_pg_publication_rel_prqual,
                                323                 :                :                               &rfisnull);
                                324                 :                : 
                                325         [ +  + ]:            237 :     if (!rfisnull)
                                326                 :                :     {
                                327                 :             51 :         rf_context  context = {0};
                                328                 :                :         Node       *rfnode;
                                329                 :             51 :         Bitmapset  *bms = NULL;
                                330                 :                : 
                                331                 :             51 :         context.pubviaroot = pubviaroot;
                                332                 :             51 :         context.parentid = publish_as_relid;
                                333                 :             51 :         context.relid = relid;
                                334                 :                : 
                                335                 :                :         /* Remember columns that are part of the REPLICA IDENTITY */
                                336                 :             51 :         bms = RelationGetIndexAttrBitmap(relation,
                                337                 :                :                                          INDEX_ATTR_BITMAP_IDENTITY_KEY);
                                338                 :                : 
                                339                 :             51 :         context.bms_replident = bms;
                                340                 :             51 :         rfnode = stringToNode(TextDatumGetCString(rfdatum));
                                341                 :             51 :         result = contain_invalid_rfcolumn_walker(rfnode, &context);
                                342                 :                :     }
                                343                 :                : 
                                344                 :            237 :     ReleaseSysCache(rftuple);
                                345                 :                : 
                                346                 :            237 :     return result;
                                347                 :                : }
                                348                 :                : 
                                349                 :                : /*
                                350                 :                :  * Check for invalid columns in the publication table definition.
                                351                 :                :  *
                                352                 :                :  * This function evaluates two conditions:
                                353                 :                :  *
                                354                 :                :  * 1. Ensures that all columns referenced in the REPLICA IDENTITY are covered
                                355                 :                :  *    by the column list. If any column is missing, *invalid_column_list is set
                                356                 :                :  *    to true.
                                357                 :                :  * 2. Ensures that all the generated columns referenced in the REPLICA IDENTITY
                                358                 :                :  *    are published, either by being explicitly named in the column list or, if
                                359                 :                :  *    no column list is specified, by setting the option
                                360                 :                :  *    publish_generated_columns to stored. If any unpublished
                                361                 :                :  *    generated column is found, *invalid_gen_col is set to true.
                                362                 :                :  *
                                363                 :                :  * Returns true if any of the above conditions are not met.
                                364                 :                :  */
                                365                 :                : bool
  466                           366                 :            472 : pub_contains_invalid_column(Oid pubid, Relation relation, List *ancestors,
                                367                 :                :                             bool pubviaroot, char pubgencols_type,
                                368                 :                :                             bool *invalid_column_list,
                                369                 :                :                             bool *invalid_gen_col)
                                370                 :                : {
 1450 tomas.vondra@postgre      371                 :            472 :     Oid         relid = RelationGetRelid(relation);
                                372                 :            472 :     Oid         publish_as_relid = RelationGetRelid(relation);
                                373                 :                :     Bitmapset  *idattrs;
  466 akapila@postgresql.o      374                 :            472 :     Bitmapset  *columns = NULL;
                                375                 :            472 :     TupleDesc   desc = RelationGetDescr(relation);
                                376                 :                :     Publication *pub;
                                377                 :                :     int         x;
                                378                 :                : 
                                379                 :            472 :     *invalid_column_list = false;
                                380                 :            472 :     *invalid_gen_col = false;
                                381                 :                : 
                                382                 :                :     /*
                                383                 :                :      * For a partition, if pubviaroot is true, find the topmost ancestor that
                                384                 :                :      * is published via this publication as we need to use its column list for
                                385                 :                :      * the changes.
                                386                 :                :      *
                                387                 :                :      * Note that even though the column list used is for an ancestor, the
                                388                 :                :      * REPLICA IDENTITY used will be for the actual child table.
                                389                 :                :      */
 1450 tomas.vondra@postgre      390   [ +  +  +  + ]:            472 :     if (pubviaroot && relation->rd_rel->relispartition)
                                391                 :                :     {
                                392                 :             80 :         publish_as_relid = GetTopMostAncestorInPublication(pubid, ancestors, NULL);
                                393                 :                : 
                                394         [ +  + ]:             80 :         if (!OidIsValid(publish_as_relid))
                                395                 :             18 :             publish_as_relid = relid;
                                396                 :                :     }
                                397                 :                : 
                                398                 :                :     /* Fetch the column list */
  466 akapila@postgresql.o      399                 :            472 :     pub = GetPublication(pubid);
                                400                 :            472 :     check_and_fetch_column_list(pub, publish_as_relid, NULL, &columns);
                                401                 :                : 
                                402         [ +  + ]:            472 :     if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
                                403                 :                :     {
                                404                 :                :         /* With REPLICA IDENTITY FULL, no column list is allowed. */
                                405                 :            106 :         *invalid_column_list = (columns != NULL);
                                406                 :                : 
                                407                 :                :         /*
                                408                 :                :          * As we don't allow a column list with REPLICA IDENTITY FULL, the
                                409                 :                :          * publish_generated_columns option must be set to stored if the table
                                410                 :                :          * has any stored generated columns.
                                411                 :                :          */
  416                           412         [ +  + ]:            106 :         if (pubgencols_type != PUBLISH_GENCOLS_STORED &&
  466                           413         [ +  + ]:            100 :             relation->rd_att->constr &&
                                414         [ +  + ]:             44 :             relation->rd_att->constr->has_generated_stored)
                                415                 :              3 :             *invalid_gen_col = true;
                                416                 :                : 
                                417                 :                :         /*
                                418                 :                :          * Virtual generated columns are currently not supported for logical
                                419                 :                :          * replication at all.
                                420                 :                :          */
  401 peter@eisentraut.org      421         [ +  + ]:            106 :         if (relation->rd_att->constr &&
                                422         [ +  + ]:             50 :             relation->rd_att->constr->has_generated_virtual)
                                423                 :              6 :             *invalid_gen_col = true;
                                424                 :                : 
  466 akapila@postgresql.o      425   [ +  +  -  + ]:            106 :         if (*invalid_gen_col && *invalid_column_list)
  466 akapila@postgresql.o      426                 :UBC           0 :             return true;
                                427                 :                :     }
                                428                 :                : 
                                429                 :                :     /* Remember columns that are part of the REPLICA IDENTITY */
  466 akapila@postgresql.o      430                 :CBC         472 :     idattrs = RelationGetIndexAttrBitmap(relation,
                                431                 :                :                                          INDEX_ATTR_BITMAP_IDENTITY_KEY);
                                432                 :                : 
                                433                 :                :     /*
                                434                 :                :      * Attnums in the bitmap returned by RelationGetIndexAttrBitmap are offset
                                435                 :                :      * (to handle system columns the usual way), while column list does not
                                436                 :                :      * use offset, so we can't do bms_is_subset(). Instead, we have to loop
                                437                 :                :      * over the idattrs and check all of them are in the list.
                                438                 :                :      */
                                439                 :            472 :     x = -1;
                                440         [ +  + ]:            801 :     while ((x = bms_next_member(idattrs, x)) >= 0)
                                441                 :                :     {
                                442                 :            332 :         AttrNumber  attnum = (x + FirstLowInvalidHeapAttributeNumber);
                                443                 :            332 :         Form_pg_attribute att = TupleDescAttr(desc, attnum - 1);
                                444                 :                : 
                                445         [ +  + ]:            332 :         if (columns == NULL)
                                446                 :                :         {
                                447                 :                :             /*
                                448                 :                :              * The publish_generated_columns option must be set to stored if
                                449                 :                :              * the REPLICA IDENTITY contains any stored generated column.
                                450                 :                :              */
  401 peter@eisentraut.org      451   [ +  +  +  - ]:            227 :             if (att->attgenerated == ATTRIBUTE_GENERATED_STORED && pubgencols_type != PUBLISH_GENCOLS_STORED)
                                452                 :                :             {
                                453                 :              3 :                 *invalid_gen_col = true;
                                454                 :              3 :                 break;
                                455                 :                :             }
                                456                 :                : 
                                457                 :                :             /*
                                458                 :                :              * The equivalent setting for virtual generated columns does not
                                459                 :                :              * exist yet.
                                460                 :                :              */
                                461         [ -  + ]:            224 :             if (att->attgenerated == ATTRIBUTE_GENERATED_VIRTUAL)
                                462                 :                :             {
  466 akapila@postgresql.o      463                 :UBC           0 :                 *invalid_gen_col = true;
                                464                 :              0 :                 break;
                                465                 :                :             }
                                466                 :                : 
                                467                 :                :             /* Skip validating the column list since it is not defined */
  466 akapila@postgresql.o      468                 :CBC         224 :             continue;
                                469                 :                :         }
                                470                 :                : 
                                471                 :                :         /*
                                472                 :                :          * If pubviaroot is true, we are validating the column list of the
                                473                 :                :          * parent table, but the bitmap contains the replica identity
                                474                 :                :          * information of the child table. The parent/child attnums may not
                                475                 :                :          * match, so translate them to the parent - get the attname from the
                                476                 :                :          * child, and look it up in the parent.
                                477                 :                :          */
                                478         [ +  + ]:            105 :         if (pubviaroot)
                                479                 :                :         {
                                480                 :                :             /* attribute name in the child table */
                                481                 :             40 :             char       *colname = get_attname(relid, attnum, false);
                                482                 :                : 
                                483                 :                :             /*
                                484                 :                :              * Determine the attnum for the attribute name in parent (we are
                                485                 :                :              * using the column list defined on the parent).
                                486                 :                :              */
                                487                 :             40 :             attnum = get_attnum(publish_as_relid, colname);
                                488                 :                :         }
                                489                 :                : 
                                490                 :                :         /* replica identity column, not covered by the column list */
                                491                 :            105 :         *invalid_column_list |= !bms_is_member(attnum, columns);
                                492                 :                : 
                                493   [ +  +  -  + ]:            105 :         if (*invalid_column_list && *invalid_gen_col)
  466 akapila@postgresql.o      494                 :UBC           0 :             break;
                                495                 :                :     }
                                496                 :                : 
  466 akapila@postgresql.o      497                 :CBC         472 :     bms_free(columns);
                                498                 :            472 :     bms_free(idattrs);
                                499                 :                : 
                                500   [ +  +  +  + ]:            472 :     return *invalid_column_list || *invalid_gen_col;
                                501                 :                : }
                                502                 :                : 
                                503                 :                : /*
                                504                 :                :  * Invalidate entries in the RelationSyncCache for relations included in the
                                505                 :                :  * specified publication, either via FOR TABLE or FOR TABLES IN SCHEMA.
                                506                 :                :  *
                                507                 :                :  * If 'puballtables' is true, invalidate all cache entries.
                                508                 :                :  */
                                509                 :                : void
  367                           510                 :             18 : InvalidatePubRelSyncCache(Oid pubid, bool puballtables)
                                511                 :                : {
                                512         [ +  + ]:             18 :     if (puballtables)
                                513                 :                :     {
                                514                 :              3 :         CacheInvalidateRelSyncAll();
                                515                 :                :     }
                                516                 :                :     else
                                517                 :                :     {
                                518                 :             15 :         List       *relids = NIL;
                                519                 :             15 :         List       *schemarelids = NIL;
                                520                 :                : 
                                521                 :                :         /*
                                522                 :                :          * For partitioned tables, we must invalidate all partitions and
                                523                 :                :          * itself. WAL records for INSERT/UPDATE/DELETE specify leaf tables as
                                524                 :                :          * a target. However, WAL records for TRUNCATE specify both a root and
                                525                 :                :          * its leaves.
                                526                 :                :          */
   11 akapila@postgresql.o      527                 :GNC          15 :         relids = GetIncludedPublicationRelations(pubid,
                                528                 :                :                                                  PUBLICATION_PART_ALL);
  367 akapila@postgresql.o      529                 :CBC          15 :         schemarelids = GetAllSchemaPublicationRelations(pubid,
                                530                 :                :                                                         PUBLICATION_PART_ALL);
                                531                 :                : 
                                532                 :             15 :         relids = list_concat_unique_oid(relids, schemarelids);
                                533                 :                : 
                                534                 :                :         /* Invalidate the relsyncache */
                                535   [ +  +  +  +  :             33 :         foreach_oid(relid, relids)
                                              +  + ]
                                536                 :              3 :             CacheInvalidateRelSync(relid);
                                537                 :                :     }
                                538                 :                : 
                                539                 :             18 :     return;
                                540                 :                : }
                                541                 :                : 
                                542                 :                : /* check_functions_in_node callback */
                                543                 :                : static bool
 1482                           544                 :            218 : contain_mutable_or_user_functions_checker(Oid func_id, void *context)
                                545                 :                : {
                                546   [ +  +  +  + ]:            218 :     return (func_volatile(func_id) != PROVOLATILE_IMMUTABLE ||
                                547                 :                :             func_id >= FirstNormalObjectId);
                                548                 :                : }
                                549                 :                : 
                                550                 :                : /*
                                551                 :                :  * The row filter walker checks if the row filter expression is a "simple
                                552                 :                :  * expression".
                                553                 :                :  *
                                554                 :                :  * It allows only simple or compound expressions such as:
                                555                 :                :  * - (Var Op Const)
                                556                 :                :  * - (Var Op Var)
                                557                 :                :  * - (Var Op Const) AND/OR (Var Op Const)
                                558                 :                :  * - etc
                                559                 :                :  * (where Var is a column of the table this filter belongs to)
                                560                 :                :  *
                                561                 :                :  * The simple expression has the following restrictions:
                                562                 :                :  * - User-defined operators are not allowed;
                                563                 :                :  * - User-defined functions are not allowed;
                                564                 :                :  * - User-defined types are not allowed;
                                565                 :                :  * - User-defined collations are not allowed;
                                566                 :                :  * - Non-immutable built-in functions are not allowed;
                                567                 :                :  * - System columns are not allowed.
                                568                 :                :  *
                                569                 :                :  * NOTES
                                570                 :                :  *
                                571                 :                :  * We don't allow user-defined functions/operators/types/collations because
                                572                 :                :  * (a) if a user drops a user-defined object used in a row filter expression or
                                573                 :                :  * if there is any other error while using it, the logical decoding
                                574                 :                :  * infrastructure won't be able to recover from such an error even if the
                                575                 :                :  * object is recreated again because a historic snapshot is used to evaluate
                                576                 :                :  * the row filter;
                                577                 :                :  * (b) a user-defined function can be used to access tables that could have
                                578                 :                :  * unpleasant results because a historic snapshot is used. That's why only
                                579                 :                :  * immutable built-in functions are allowed in row filter expressions.
                                580                 :                :  *
                                581                 :                :  * We don't allow system columns because currently, we don't have that
                                582                 :                :  * information in the tuple passed to downstream. Also, as we don't replicate
                                583                 :                :  * those to subscribers, there doesn't seem to be a need for a filter on those
                                584                 :                :  * columns.
                                585                 :                :  *
                                586                 :                :  * We can allow other node types after more analysis and testing.
                                587                 :                :  */
                                588                 :                : static bool
                                589                 :            720 : check_simple_rowfilter_expr_walker(Node *node, ParseState *pstate)
                                590                 :                : {
                                591                 :            720 :     char       *errdetail_msg = NULL;
                                592                 :                : 
                                593         [ +  + ]:            720 :     if (node == NULL)
                                594                 :              3 :         return false;
                                595                 :                : 
                                596   [ +  +  +  +  :            717 :     switch (nodeTag(node))
                                              +  + ]
                                597                 :                :     {
                                598                 :            194 :         case T_Var:
                                599                 :                :             /* System columns are not allowed. */
                                600         [ +  + ]:            194 :             if (((Var *) node)->varattno < InvalidAttrNumber)
                                601                 :              3 :                 errdetail_msg = _("System columns are not allowed.");
                                602                 :            194 :             break;
                                603                 :            196 :         case T_OpExpr:
                                604                 :                :         case T_DistinctExpr:
                                605                 :                :         case T_NullIfExpr:
                                606                 :                :             /* OK, except user-defined operators are not allowed. */
                                607         [ +  + ]:            196 :             if (((OpExpr *) node)->opno >= FirstNormalObjectId)
                                608                 :              3 :                 errdetail_msg = _("User-defined operators are not allowed.");
                                609                 :            196 :             break;
                                610                 :              3 :         case T_ScalarArrayOpExpr:
                                611                 :                :             /* OK, except user-defined operators are not allowed. */
                                612         [ -  + ]:              3 :             if (((ScalarArrayOpExpr *) node)->opno >= FirstNormalObjectId)
 1482 akapila@postgresql.o      613                 :UBC           0 :                 errdetail_msg = _("User-defined operators are not allowed.");
                                614                 :                : 
                                615                 :                :             /*
                                616                 :                :              * We don't need to check the hashfuncid and negfuncid of
                                617                 :                :              * ScalarArrayOpExpr as those functions are only built for a
                                618                 :                :              * subquery.
                                619                 :                :              */
 1482 akapila@postgresql.o      620                 :CBC           3 :             break;
                                621                 :              3 :         case T_RowCompareExpr:
                                622                 :                :             {
                                623                 :                :                 ListCell   *opid;
                                624                 :                : 
                                625                 :                :                 /* OK, except user-defined operators are not allowed. */
                                626   [ +  -  +  +  :              9 :                 foreach(opid, ((RowCompareExpr *) node)->opnos)
                                              +  + ]
                                627                 :                :                 {
                                628         [ -  + ]:              6 :                     if (lfirst_oid(opid) >= FirstNormalObjectId)
                                629                 :                :                     {
 1482 akapila@postgresql.o      630                 :UBC           0 :                         errdetail_msg = _("User-defined operators are not allowed.");
                                631                 :              0 :                         break;
                                632                 :                :                     }
                                633                 :                :                 }
                                634                 :                :             }
 1482 akapila@postgresql.o      635                 :CBC           3 :             break;
                                636                 :            318 :         case T_Const:
                                637                 :                :         case T_FuncExpr:
                                638                 :                :         case T_BoolExpr:
                                639                 :                :         case T_RelabelType:
                                640                 :                :         case T_CollateExpr:
                                641                 :                :         case T_CaseExpr:
                                642                 :                :         case T_CaseTestExpr:
                                643                 :                :         case T_ArrayExpr:
                                644                 :                :         case T_RowExpr:
                                645                 :                :         case T_CoalesceExpr:
                                646                 :                :         case T_MinMaxExpr:
                                647                 :                :         case T_XmlExpr:
                                648                 :                :         case T_NullTest:
                                649                 :                :         case T_BooleanTest:
                                650                 :                :         case T_List:
                                651                 :                :             /* OK, supported */
                                652                 :            318 :             break;
                                653                 :              3 :         default:
 1268 peter@eisentraut.org      654                 :              3 :             errdetail_msg = _("Only columns, constants, built-in operators, built-in data types, built-in collations, and immutable built-in functions are allowed.");
 1482 akapila@postgresql.o      655                 :              3 :             break;
                                656                 :                :     }
                                657                 :                : 
                                658                 :                :     /*
                                659                 :                :      * For all the supported nodes, if we haven't already found a problem,
                                660                 :                :      * check the types, functions, and collations used in it.  We check List
                                661                 :                :      * by walking through each element.
                                662                 :                :      */
 1264 alvherre@alvh.no-ip.      663   [ +  +  +  + ]:            717 :     if (!errdetail_msg && !IsA(node, List))
                                664                 :                :     {
                                665         [ +  + ]:            681 :         if (exprType(node) >= FirstNormalObjectId)
                                666                 :              3 :             errdetail_msg = _("User-defined types are not allowed.");
                                667         [ +  + ]:            678 :         else if (check_functions_in_node(node, contain_mutable_or_user_functions_checker,
                                668                 :                :                                          pstate))
                                669                 :              6 :             errdetail_msg = _("User-defined or built-in mutable functions are not allowed.");
                                670   [ +  -  +  + ]:           1344 :         else if (exprCollation(node) >= FirstNormalObjectId ||
                                671                 :            672 :                  exprInputCollation(node) >= FirstNormalObjectId)
                                672                 :              3 :             errdetail_msg = _("User-defined collations are not allowed.");
                                673                 :                :     }
                                674                 :                : 
                                675                 :                :     /*
                                676                 :                :      * If we found a problem in this node, throw error now. Otherwise keep
                                677                 :                :      * going.
                                678                 :                :      */
 1482 akapila@postgresql.o      679         [ +  + ]:            717 :     if (errdetail_msg)
                                680         [ +  - ]:             21 :         ereport(ERROR,
                                681                 :                :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                                682                 :                :                  errmsg("invalid publication WHERE expression"),
                                683                 :                :                  errdetail_internal("%s", errdetail_msg),
                                684                 :                :                  parser_errposition(pstate, exprLocation(node))));
                                685                 :                : 
                                686                 :            696 :     return expression_tree_walker(node, check_simple_rowfilter_expr_walker,
                                687                 :                :                                   pstate);
                                688                 :                : }
                                689                 :                : 
                                690                 :                : /*
                                691                 :                :  * Check if the row filter expression is a "simple expression".
                                692                 :                :  *
                                693                 :                :  * See check_simple_rowfilter_expr_walker for details.
                                694                 :                :  */
                                695                 :                : static bool
                                696                 :            188 : check_simple_rowfilter_expr(Node *node, ParseState *pstate)
                                697                 :                : {
                                698                 :            188 :     return check_simple_rowfilter_expr_walker(node, pstate);
                                699                 :                : }
                                700                 :                : 
                                701                 :                : /*
                                702                 :                :  * Transform the publication WHERE expression for all the relations in the list,
                                703                 :                :  * ensuring it is coerced to boolean and necessary collation information is
                                704                 :                :  * added if required, and add a new nsitem/RTE for the associated relation to
                                705                 :                :  * the ParseState's namespace list.
                                706                 :                :  *
                                707                 :                :  * Also check the publication row filter expression and throw an error if
                                708                 :                :  * anything not permitted or unexpected is encountered.
                                709                 :                :  */
                                710                 :                : static void
                                711                 :            595 : TransformPubWhereClauses(List *tables, const char *queryString,
                                712                 :                :                          bool pubviaroot)
                                713                 :                : {
                                714                 :                :     ListCell   *lc;
                                715                 :                : 
                                716   [ +  +  +  +  :           1198 :     foreach(lc, tables)
                                              +  + ]
                                717                 :                :     {
                                718                 :                :         ParseNamespaceItem *nsitem;
                                719                 :            633 :         Node       *whereclause = NULL;
                                720                 :                :         ParseState *pstate;
                                721                 :            633 :         PublicationRelInfo *pri = (PublicationRelInfo *) lfirst(lc);
                                722                 :                : 
                                723         [ +  + ]:            633 :         if (pri->whereClause == NULL)
                                724                 :            436 :             continue;
                                725                 :                : 
                                726                 :                :         /*
                                727                 :                :          * If the publication doesn't publish changes via the root partitioned
                                728                 :                :          * table, the partition's row filter will be used. So disallow using
                                729                 :                :          * WHERE clause on partitioned table in this case.
                                730                 :                :          */
                                731         [ +  + ]:            197 :         if (!pubviaroot &&
                                732         [ +  + ]:            182 :             pri->relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
                                733         [ +  - ]:              3 :             ereport(ERROR,
                                734                 :                :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                                735                 :                :                      errmsg("cannot use publication WHERE clause for relation \"%s\"",
                                736                 :                :                             RelationGetRelationName(pri->relation)),
                                737                 :                :                      errdetail("WHERE clause cannot be used for a partitioned table when %s is false.",
                                738                 :                :                                "publish_via_partition_root")));
                                739                 :                : 
                                740                 :                :         /*
                                741                 :                :          * A fresh pstate is required so that we only have "this" table in its
                                742                 :                :          * rangetable
                                743                 :                :          */
                                744                 :            194 :         pstate = make_parsestate(NULL);
                                745                 :            194 :         pstate->p_sourcetext = queryString;
                                746                 :            194 :         nsitem = addRangeTableEntryForRelation(pstate, pri->relation,
                                747                 :                :                                                AccessShareLock, NULL,
                                748                 :                :                                                false, false);
                                749                 :            194 :         addNSItemToQuery(pstate, nsitem, false, true, true);
                                750                 :                : 
                                751                 :            194 :         whereclause = transformWhereClause(pstate,
                                752                 :            194 :                                            copyObject(pri->whereClause),
                                753                 :                :                                            EXPR_KIND_WHERE,
                                754                 :                :                                            "PUBLICATION WHERE");
                                755                 :                : 
                                756                 :                :         /* Fix up collation information */
                                757                 :            188 :         assign_expr_collations(pstate, whereclause);
                                758                 :                : 
  401 peter@eisentraut.org      759                 :            188 :         whereclause = expand_generated_columns_in_expr(whereclause, pri->relation, 1);
                                760                 :                : 
                                761                 :                :         /*
                                762                 :                :          * We allow only simple expressions in row filters. See
                                763                 :                :          * check_simple_rowfilter_expr_walker.
                                764                 :                :          */
 1482 akapila@postgresql.o      765                 :            188 :         check_simple_rowfilter_expr(whereclause, pstate);
                                766                 :                : 
                                767                 :            167 :         free_parsestate(pstate);
                                768                 :                : 
                                769                 :            167 :         pri->whereClause = whereclause;
                                770                 :                :     }
                                771                 :            565 : }
                                772                 :                : 
                                773                 :                : 
                                774                 :                : /*
                                775                 :                :  * Given a list of tables that are going to be added to a publication,
                                776                 :                :  * verify that they fulfill the necessary preconditions, namely: no tables
                                777                 :                :  * have a column list if any schema is published; and partitioned tables do
                                778                 :                :  * not have column lists if publish_via_partition_root is not set.
                                779                 :                :  *
                                780                 :                :  * 'publish_schema' indicates that the publication contains any TABLES IN
                                781                 :                :  * SCHEMA elements (newly added in this command, or preexisting).
                                782                 :                :  * 'pubviaroot' is the value of publish_via_partition_root.
                                783                 :                :  */
                                784                 :                : static void
 1265 alvherre@alvh.no-ip.      785                 :            565 : CheckPubRelationColumnList(char *pubname, List *tables,
                                786                 :                :                            bool publish_schema, bool pubviaroot)
                                787                 :                : {
                                788                 :                :     ListCell   *lc;
                                789                 :                : 
 1450 tomas.vondra@postgre      790   [ +  +  +  +  :           1153 :     foreach(lc, tables)
                                              +  + ]
                                791                 :                :     {
                                792                 :            603 :         PublicationRelInfo *pri = (PublicationRelInfo *) lfirst(lc);
                                793                 :                : 
                                794         [ +  + ]:            603 :         if (pri->columns == NIL)
                                795                 :            401 :             continue;
                                796                 :                : 
                                797                 :                :         /*
                                798                 :                :          * Disallow specifying column list if any schema is in the
                                799                 :                :          * publication.
                                800                 :                :          *
                                801                 :                :          * XXX We could instead just forbid the case when the publication
                                802                 :                :          * tries to publish the table with a column list and a schema for that
                                803                 :                :          * table. However, if we do that then we need a restriction during
                                804                 :                :          * ALTER TABLE ... SET SCHEMA to prevent such a case which doesn't
                                805                 :                :          * seem to be a good idea.
                                806                 :                :          */
 1269 akapila@postgresql.o      807         [ +  + ]:            202 :         if (publish_schema)
                                808         [ +  - ]:             12 :             ereport(ERROR,
                                809                 :                :                     errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                                810                 :                :                     errmsg("cannot use column list for relation \"%s.%s\" in publication \"%s\"",
                                811                 :                :                            get_namespace_name(RelationGetNamespace(pri->relation)),
                                812                 :                :                            RelationGetRelationName(pri->relation), pubname),
                                813                 :                :                     errdetail("Column lists cannot be specified in publications containing FOR TABLES IN SCHEMA elements."));
                                814                 :                : 
                                815                 :                :         /*
                                816                 :                :          * If the publication doesn't publish changes via the root partitioned
                                817                 :                :          * table, the partition's column list will be used. So disallow using
                                818                 :                :          * a column list on the partitioned table in this case.
                                819                 :                :          */
 1450 tomas.vondra@postgre      820         [ +  + ]:            190 :         if (!pubviaroot &&
                                821         [ +  + ]:            152 :             pri->relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
                                822         [ +  - ]:              3 :             ereport(ERROR,
                                823                 :                :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                                824                 :                :                      errmsg("cannot use column list for relation \"%s.%s\" in publication \"%s\"",
                                825                 :                :                             get_namespace_name(RelationGetNamespace(pri->relation)),
                                826                 :                :                             RelationGetRelationName(pri->relation), pubname),
                                827                 :                :                      errdetail("Column lists cannot be specified for partitioned tables when %s is false.",
                                828                 :                :                                "publish_via_partition_root")));
                                829                 :                :     }
                                830                 :            550 : }
                                831                 :                : 
                                832                 :                : /*
                                833                 :                :  * Create new publication.
                                834                 :                :  */
                                835                 :                : ObjectAddress
 1704 dean.a.rasheed@gmail      836                 :            479 : CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt)
                                837                 :                : {
                                838                 :                :     Relation    rel;
                                839                 :                :     ObjectAddress myself;
                                840                 :                :     Oid         puboid;
                                841                 :                :     bool        nulls[Natts_pg_publication];
                                842                 :                :     Datum       values[Natts_pg_publication];
                                843                 :                :     HeapTuple   tup;
                                844                 :                :     bool        publish_given;
                                845                 :                :     PublicationActions pubactions;
                                846                 :                :     bool        publish_via_partition_root_given;
                                847                 :                :     bool        publish_via_partition_root;
                                848                 :                :     bool        publish_generated_columns_given;
                                849                 :                :     char        publish_generated_columns;
                                850                 :                :     AclResult   aclresult;
 1438 tomas.vondra@postgre      851                 :            479 :     List       *relations = NIL;
   11 akapila@postgresql.o      852                 :GNC         479 :     List       *exceptrelations = NIL;
 1438 tomas.vondra@postgre      853                 :CBC         479 :     List       *schemaidlist = NIL;
                                854                 :                : 
                                855                 :                :     /* must have CREATE privilege on database */
 1218 peter@eisentraut.org      856                 :            479 :     aclresult = object_aclcheck(DatabaseRelationId, MyDatabaseId, GetUserId(), ACL_CREATE);
 3342 peter_e@gmx.net           857         [ +  + ]:            479 :     if (aclresult != ACLCHECK_OK)
 3025                           858                 :              3 :         aclcheck_error(aclresult, OBJECT_DATABASE,
 3342                           859                 :              3 :                        get_database_name(MyDatabaseId));
                                860                 :                : 
                                861                 :                :     /* FOR ALL TABLES and FOR ALL SEQUENCES requires superuser */
  157 akapila@postgresql.o      862         [ +  + ]:GNC         476 :     if (!superuser())
                                863                 :                :     {
                                864   [ +  -  -  + ]:             12 :         if (stmt->for_all_tables || stmt->for_all_sequences)
  157 akapila@postgresql.o      865         [ #  # ]:UNC           0 :             ereport(ERROR,
                                866                 :                :                     errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
                                867                 :                :                     errmsg("must be superuser to create a FOR ALL TABLES or ALL SEQUENCES publication"));
                                868                 :                :     }
                                869                 :                : 
 2610 andres@anarazel.de        870                 :CBC         476 :     rel = table_open(PublicationRelationId, RowExclusiveLock);
                                871                 :                : 
                                872                 :                :     /* Check if name is used */
 2672                           873                 :            476 :     puboid = GetSysCacheOid1(PUBLICATIONNAME, Anum_pg_publication_oid,
                                874                 :                :                              CStringGetDatum(stmt->pubname));
 3342 peter_e@gmx.net           875         [ +  + ]:            476 :     if (OidIsValid(puboid))
                                876         [ +  - ]:              3 :         ereport(ERROR,
                                877                 :                :                 (errcode(ERRCODE_DUPLICATE_OBJECT),
                                878                 :                :                  errmsg("publication \"%s\" already exists",
                                879                 :                :                         stmt->pubname)));
                                880                 :                : 
                                881                 :                :     /* Form a tuple. */
                                882                 :            473 :     memset(values, 0, sizeof(values));
                                883                 :            473 :     memset(nulls, false, sizeof(nulls));
                                884                 :                : 
                                885                 :            473 :     values[Anum_pg_publication_pubname - 1] =
                                886                 :            473 :         DirectFunctionCall1(namein, CStringGetDatum(stmt->pubname));
                                887                 :            473 :     values[Anum_pg_publication_pubowner - 1] = ObjectIdGetDatum(GetUserId());
                                888                 :                : 
 1704 dean.a.rasheed@gmail      889                 :            473 :     parse_publication_options(pstate,
                                890                 :                :                               stmt->options,
                                891                 :                :                               &publish_given, &pubactions,
                                892                 :                :                               &publish_via_partition_root_given,
                                893                 :                :                               &publish_via_partition_root,
                                894                 :                :                               &publish_generated_columns_given,
                                895                 :                :                               &publish_generated_columns);
                                896                 :                : 
  157 akapila@postgresql.o      897         [ +  + ]:GNC         455 :     if (stmt->for_all_sequences &&
                                898   [ +  +  +  -  :             14 :         (publish_given || publish_via_partition_root_given ||
                                              -  + ]
                                899                 :                :          publish_generated_columns_given))
                                900         [ +  - ]:              1 :         ereport(NOTICE,
                                901                 :                :                 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                                902                 :                :                 errmsg("publication parameters are not applicable to sequence synchronization and will be ignored for sequences"));
                                903                 :                : 
 2672 andres@anarazel.de        904                 :CBC         455 :     puboid = GetNewOidWithIndex(rel, PublicationObjectIndexId,
                                905                 :                :                                 Anum_pg_publication_oid);
                                906                 :            455 :     values[Anum_pg_publication_oid - 1] = ObjectIdGetDatum(puboid);
 3342 peter_e@gmx.net           907                 :            455 :     values[Anum_pg_publication_puballtables - 1] =
 1438 tomas.vondra@postgre      908                 :            455 :         BoolGetDatum(stmt->for_all_tables);
  157 akapila@postgresql.o      909                 :GNC         455 :     values[Anum_pg_publication_puballsequences - 1] =
                                910                 :            455 :         BoolGetDatum(stmt->for_all_sequences);
 3342 peter_e@gmx.net           911                 :CBC         455 :     values[Anum_pg_publication_pubinsert - 1] =
 2167 peter@eisentraut.org      912                 :            455 :         BoolGetDatum(pubactions.pubinsert);
 3342 peter_e@gmx.net           913                 :            455 :     values[Anum_pg_publication_pubupdate - 1] =
 2167 peter@eisentraut.org      914                 :            455 :         BoolGetDatum(pubactions.pubupdate);
 3342 peter_e@gmx.net           915                 :            455 :     values[Anum_pg_publication_pubdelete - 1] =
 2167 peter@eisentraut.org      916                 :            455 :         BoolGetDatum(pubactions.pubdelete);
 2899 peter_e@gmx.net           917                 :            455 :     values[Anum_pg_publication_pubtruncate - 1] =
 2167 peter@eisentraut.org      918                 :            455 :         BoolGetDatum(pubactions.pubtruncate);
                                919                 :            455 :     values[Anum_pg_publication_pubviaroot - 1] =
                                920                 :            455 :         BoolGetDatum(publish_via_partition_root);
  411 akapila@postgresql.o      921                 :            455 :     values[Anum_pg_publication_pubgencols - 1] =
  416                           922                 :            455 :         CharGetDatum(publish_generated_columns);
                                923                 :                : 
 3342 peter_e@gmx.net           924                 :            455 :     tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
                                925                 :                : 
                                926                 :                :     /* Insert tuple into catalog. */
 2672 andres@anarazel.de        927                 :            455 :     CatalogTupleInsert(rel, tup);
 3342 peter_e@gmx.net           928                 :            455 :     heap_freetuple(tup);
                                929                 :                : 
 3341 alvherre@alvh.no-ip.      930                 :            455 :     recordDependencyOnOwner(PublicationRelationId, puboid, GetUserId());
                                931                 :                : 
 3342 peter_e@gmx.net           932                 :            455 :     ObjectAddressSet(myself, PublicationRelationId, puboid);
                                933                 :                : 
                                934                 :                :     /* Make the changes visible. */
                                935                 :            455 :     CommandCounterIncrement();
                                936                 :                : 
                                937                 :                :     /* Associate objects with the publication. */
   11 akapila@postgresql.o      938                 :GNC         455 :     ObjectsInPublicationToOids(stmt->pubobjects, pstate, &relations,
                                939                 :                :                                &exceptrelations, &schemaidlist);
                                940                 :                : 
 1438 tomas.vondra@postgre      941         [ +  + ]:CBC         446 :     if (stmt->for_all_tables)
                                942                 :                :     {
                                943                 :                :         /* Process EXCEPT table list */
   11 akapila@postgresql.o      944         [ +  + ]:GNC          82 :         if (exceptrelations != NIL)
                                945                 :                :         {
                                946                 :                :             List       *rels;
                                947                 :                : 
                                948                 :             28 :             rels = OpenTableList(exceptrelations);
                                949                 :             28 :             PublicationAddTables(puboid, rels, true, NULL);
                                950                 :             25 :             CloseTableList(rels);
                                951                 :                :         }
                                952                 :                : 
                                953                 :                :         /*
                                954                 :                :          * Invalidate relcache so that publication info is rebuilt. Sequences
                                955                 :                :          * publication doesn't require invalidation, as replica identity
                                956                 :                :          * checks don't apply to them.
                                957                 :                :          */
 1649 akapila@postgresql.o      958                 :CBC          79 :         CacheInvalidateRelcacheAll();
                                959                 :                :     }
  157 akapila@postgresql.o      960         [ +  + ]:GNC         364 :     else if (!stmt->for_all_sequences)
                                961                 :                :     {
                                962                 :                :         /* FOR TABLES IN SCHEMA requires superuser */
 1306 tgl@sss.pgh.pa.us         963   [ +  +  +  + ]:CBC         354 :         if (schemaidlist != NIL && !superuser())
 1600 akapila@postgresql.o      964         [ +  - ]:              3 :             ereport(ERROR,
                                965                 :                :                     errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
                                966                 :                :                     errmsg("must be superuser to create FOR TABLES IN SCHEMA publication"));
                                967                 :                : 
 1306 tgl@sss.pgh.pa.us         968         [ +  + ]:            351 :         if (relations != NIL)
                                969                 :                :         {
                                970                 :                :             List       *rels;
                                971                 :                : 
 1438 tomas.vondra@postgre      972                 :            239 :             rels = OpenTableList(relations);
 1482 akapila@postgresql.o      973                 :            224 :             TransformPubWhereClauses(rels, pstate->p_sourcetext,
                                974                 :                :                                      publish_via_partition_root);
                                975                 :                : 
 1265 alvherre@alvh.no-ip.      976                 :            212 :             CheckPubRelationColumnList(stmt->pubname, rels,
                                977                 :                :                                        schemaidlist != NIL,
                                978                 :                :                                        publish_via_partition_root);
                                979                 :                : 
 1438 tomas.vondra@postgre      980                 :            209 :             PublicationAddTables(puboid, rels, true, NULL);
                                981                 :            196 :             CloseTableList(rels);
                                982                 :                :         }
                                983                 :                : 
 1306 tgl@sss.pgh.pa.us         984         [ +  + ]:            308 :         if (schemaidlist != NIL)
                                985                 :                :         {
                                986                 :                :             /*
                                987                 :                :              * Schema lock is held until the publication is created to prevent
                                988                 :                :              * concurrent schema deletion.
                                989                 :                :              */
 1438 tomas.vondra@postgre      990                 :             76 :             LockSchemaList(schemaidlist);
                                991                 :             76 :             PublicationAddSchemas(puboid, schemaidlist, true, NULL);
                                992                 :                :         }
                                993                 :                :     }
                                994                 :                : 
 2610 andres@anarazel.de        995                 :            394 :     table_close(rel, RowExclusiveLock);
                                996                 :                : 
 3342 peter_e@gmx.net           997         [ -  + ]:            394 :     InvokeObjectPostCreateHook(PublicationRelationId, puboid, 0);
                                998                 :                : 
                                999                 :                :     /*
                               1000                 :                :      * We don't need this warning message when wal_level >= 'replica' since
                               1001                 :                :      * logical decoding is automatically enabled up on a logical slot
                               1002                 :                :      * creation.
                               1003                 :                :      */
   82 msawada@postgresql.o     1004         [ +  + ]:GNC         394 :     if (wal_level < WAL_LEVEL_REPLICA)
 2437 tmunro@postgresql.or     1005         [ +  - ]:CBC          16 :         ereport(WARNING,
                               1006                 :                :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                               1007                 :                :                  errmsg("logical decoding must be enabled to publish logical changes"),
                               1008                 :                :                  errhint("Before creating subscriptions, ensure that \"wal_level\" is set to \"replica\" or higher.")));
                               1009                 :                : 
 3342 peter_e@gmx.net          1010                 :            394 :     return myself;
                               1011                 :                : }
                               1012                 :                : 
                               1013                 :                : /*
                               1014                 :                :  * Change options of a publication.
                               1015                 :                :  */
                               1016                 :                : static void
 1704 dean.a.rasheed@gmail     1017                 :             64 : AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt,
                               1018                 :                :                         Relation rel, HeapTuple tup)
                               1019                 :                : {
                               1020                 :                :     bool        nulls[Natts_pg_publication];
                               1021                 :                :     bool        replaces[Natts_pg_publication];
                               1022                 :                :     Datum       values[Natts_pg_publication];
                               1023                 :                :     bool        publish_given;
                               1024                 :                :     PublicationActions pubactions;
                               1025                 :                :     bool        publish_via_partition_root_given;
                               1026                 :                :     bool        publish_via_partition_root;
                               1027                 :                :     bool        publish_generated_columns_given;
                               1028                 :                :     char        publish_generated_columns;
                               1029                 :                :     ObjectAddress obj;
                               1030                 :                :     Form_pg_publication pubform;
 1482 akapila@postgresql.o     1031                 :             64 :     List       *root_relids = NIL;
                               1032                 :                :     ListCell   *lc;
                               1033                 :                : 
  157 akapila@postgresql.o     1034                 :GNC          64 :     pubform = (Form_pg_publication) GETSTRUCT(tup);
                               1035                 :                : 
 1704 dean.a.rasheed@gmail     1036                 :CBC          64 :     parse_publication_options(pstate,
                               1037                 :                :                               stmt->options,
                               1038                 :                :                               &publish_given, &pubactions,
                               1039                 :                :                               &publish_via_partition_root_given,
                               1040                 :                :                               &publish_via_partition_root,
                               1041                 :                :                               &publish_generated_columns_given,
                               1042                 :                :                               &publish_generated_columns);
                               1043                 :                : 
  157 akapila@postgresql.o     1044         [ +  + ]:GNC          64 :     if (pubform->puballsequences &&
                               1045   [ +  +  +  -  :              6 :         (publish_given || publish_via_partition_root_given ||
                                              +  - ]
                               1046                 :                :          publish_generated_columns_given))
                               1047         [ +  - ]:              6 :         ereport(NOTICE,
                               1048                 :                :                 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                               1049                 :                :                 errmsg("publication parameters are not applicable to sequence synchronization and will be ignored for sequences"));
                               1050                 :                : 
                               1051                 :                :     /*
                               1052                 :                :      * If the publication doesn't publish changes via the root partitioned
                               1053                 :                :      * table, the partition's row filter and column list will be used. So
                               1054                 :                :      * disallow using WHERE clause and column lists on partitioned table in
                               1055                 :                :      * this case.
                               1056                 :                :      */
 1482 akapila@postgresql.o     1057   [ +  +  +  + ]:CBC          64 :     if (!pubform->puballtables && publish_via_partition_root_given &&
                               1058         [ +  + ]:             40 :         !publish_via_partition_root)
                               1059                 :                :     {
                               1060                 :                :         /*
                               1061                 :                :          * Lock the publication so nobody else can do anything with it. This
                               1062                 :                :          * prevents concurrent alter to add partitioned table(s) with WHERE
                               1063                 :                :          * clause(s) and/or column lists which we don't allow when not
                               1064                 :                :          * publishing via root.
                               1065                 :                :          */
                               1066                 :             24 :         LockDatabaseObject(PublicationRelationId, pubform->oid, 0,
                               1067                 :                :                            AccessShareLock);
                               1068                 :                : 
   11 akapila@postgresql.o     1069                 :GNC          24 :         root_relids = GetIncludedPublicationRelations(pubform->oid,
                               1070                 :                :                                                       PUBLICATION_PART_ROOT);
                               1071                 :                : 
 1482 akapila@postgresql.o     1072   [ +  -  +  +  :CBC          42 :         foreach(lc, root_relids)
                                              +  + ]
                               1073                 :                :         {
                               1074                 :             24 :             Oid         relid = lfirst_oid(lc);
                               1075                 :                :             HeapTuple   rftuple;
                               1076                 :                :             char        relkind;
                               1077                 :                :             char       *relname;
                               1078                 :                :             bool        has_rowfilter;
                               1079                 :                :             bool        has_collist;
                               1080                 :                : 
                               1081                 :                :             /*
                               1082                 :                :              * Beware: we don't have lock on the relations, so cope silently
                               1083                 :                :              * with the cache lookups returning NULL.
                               1084                 :                :              */
                               1085                 :                : 
                               1086                 :             24 :             rftuple = SearchSysCache2(PUBLICATIONRELMAP,
                               1087                 :                :                                       ObjectIdGetDatum(relid),
                               1088                 :                :                                       ObjectIdGetDatum(pubform->oid));
 1432 alvherre@alvh.no-ip.     1089         [ -  + ]:             24 :             if (!HeapTupleIsValid(rftuple))
 1432 alvherre@alvh.no-ip.     1090                 :UBC           0 :                 continue;
 1432 alvherre@alvh.no-ip.     1091                 :CBC          24 :             has_rowfilter = !heap_attisnull(rftuple, Anum_pg_publication_rel_prqual, NULL);
                               1092                 :             24 :             has_collist = !heap_attisnull(rftuple, Anum_pg_publication_rel_prattrs, NULL);
                               1093   [ +  +  +  + ]:             24 :             if (!has_rowfilter && !has_collist)
                               1094                 :                :             {
                               1095                 :              6 :                 ReleaseSysCache(rftuple);
                               1096                 :              6 :                 continue;
                               1097                 :                :             }
                               1098                 :                : 
                               1099                 :             18 :             relkind = get_rel_relkind(relid);
                               1100         [ +  + ]:             18 :             if (relkind != RELKIND_PARTITIONED_TABLE)
                               1101                 :                :             {
                               1102                 :             12 :                 ReleaseSysCache(rftuple);
                               1103                 :             12 :                 continue;
                               1104                 :                :             }
                               1105                 :              6 :             relname = get_rel_name(relid);
                               1106         [ -  + ]:              6 :             if (relname == NULL)    /* table concurrently dropped */
                               1107                 :                :             {
 1482 akapila@postgresql.o     1108                 :UBC           0 :                 ReleaseSysCache(rftuple);
 1432 alvherre@alvh.no-ip.     1109                 :              0 :                 continue;
                               1110                 :                :             }
                               1111                 :                : 
 1432 alvherre@alvh.no-ip.     1112         [ +  + ]:CBC           6 :             if (has_rowfilter)
                               1113         [ +  - ]:              3 :                 ereport(ERROR,
                               1114                 :                :                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                               1115                 :                :                          errmsg("cannot set parameter \"%s\" to false for publication \"%s\"",
                               1116                 :                :                                 "publish_via_partition_root",
                               1117                 :                :                                 stmt->pubname),
                               1118                 :                :                          errdetail("The publication contains a WHERE clause for partitioned table \"%s\", which is not allowed when \"%s\" is false.",
                               1119                 :                :                                    relname, "publish_via_partition_root")));
                               1120         [ -  + ]:              3 :             Assert(has_collist);
                               1121         [ +  - ]:              3 :             ereport(ERROR,
                               1122                 :                :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                               1123                 :                :                      errmsg("cannot set parameter \"%s\" to false for publication \"%s\"",
                               1124                 :                :                             "publish_via_partition_root",
                               1125                 :                :                             stmt->pubname),
                               1126                 :                :                      errdetail("The publication contains a column list for partitioned table \"%s\", which is not allowed when \"%s\" is false.",
                               1127                 :                :                                relname, "publish_via_partition_root")));
                               1128                 :                :         }
                               1129                 :                :     }
                               1130                 :                : 
                               1131                 :                :     /* Everything ok, form a new tuple. */
 3342 peter_e@gmx.net          1132                 :             58 :     memset(values, 0, sizeof(values));
                               1133                 :             58 :     memset(nulls, false, sizeof(nulls));
                               1134                 :             58 :     memset(replaces, false, sizeof(replaces));
                               1135                 :                : 
 3229                          1136         [ +  + ]:             58 :     if (publish_given)
                               1137                 :                :     {
 2167 peter@eisentraut.org     1138                 :             17 :         values[Anum_pg_publication_pubinsert - 1] = BoolGetDatum(pubactions.pubinsert);
 3342 peter_e@gmx.net          1139                 :             17 :         replaces[Anum_pg_publication_pubinsert - 1] = true;
                               1140                 :                : 
 2167 peter@eisentraut.org     1141                 :             17 :         values[Anum_pg_publication_pubupdate - 1] = BoolGetDatum(pubactions.pubupdate);
 3342 peter_e@gmx.net          1142                 :             17 :         replaces[Anum_pg_publication_pubupdate - 1] = true;
                               1143                 :                : 
 2167 peter@eisentraut.org     1144                 :             17 :         values[Anum_pg_publication_pubdelete - 1] = BoolGetDatum(pubactions.pubdelete);
 3342 peter_e@gmx.net          1145                 :             17 :         replaces[Anum_pg_publication_pubdelete - 1] = true;
                               1146                 :                : 
 2167 peter@eisentraut.org     1147                 :             17 :         values[Anum_pg_publication_pubtruncate - 1] = BoolGetDatum(pubactions.pubtruncate);
 2899 peter_e@gmx.net          1148                 :             17 :         replaces[Anum_pg_publication_pubtruncate - 1] = true;
                               1149                 :                :     }
                               1150                 :                : 
 2167 peter@eisentraut.org     1151         [ +  + ]:             58 :     if (publish_via_partition_root_given)
                               1152                 :                :     {
                               1153                 :             35 :         values[Anum_pg_publication_pubviaroot - 1] = BoolGetDatum(publish_via_partition_root);
                               1154                 :             35 :         replaces[Anum_pg_publication_pubviaroot - 1] = true;
                               1155                 :                :     }
                               1156                 :                : 
  493 akapila@postgresql.o     1157         [ +  + ]:             58 :     if (publish_generated_columns_given)
                               1158                 :                :     {
  411                          1159                 :              6 :         values[Anum_pg_publication_pubgencols - 1] = CharGetDatum(publish_generated_columns);
                               1160                 :              6 :         replaces[Anum_pg_publication_pubgencols - 1] = true;
                               1161                 :                :     }
                               1162                 :                : 
 3342 peter_e@gmx.net          1163                 :             58 :     tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
                               1164                 :                :                             replaces);
                               1165                 :                : 
                               1166                 :                :     /* Update the catalog. */
 3330 alvherre@alvh.no-ip.     1167                 :             58 :     CatalogTupleUpdate(rel, &tup->t_self, tup);
                               1168                 :                : 
 3342 peter_e@gmx.net          1169                 :             58 :     CommandCounterIncrement();
                               1170                 :                : 
 2672 andres@anarazel.de       1171                 :             58 :     pubform = (Form_pg_publication) GETSTRUCT(tup);
                               1172                 :                : 
                               1173                 :                :     /* Invalidate the relcache. */
 1438 tomas.vondra@postgre     1174         [ +  + ]:             58 :     if (pubform->puballtables)
                               1175                 :                :     {
 3342 peter_e@gmx.net          1176                 :             10 :         CacheInvalidateRelcacheAll();
                               1177                 :                :     }
                               1178                 :                :     else
                               1179                 :                :     {
 1600 akapila@postgresql.o     1180                 :             48 :         List       *relids = NIL;
                               1181                 :             48 :         List       *schemarelids = NIL;
                               1182                 :                : 
                               1183                 :                :         /*
                               1184                 :                :          * For any partitioned tables contained in the publication, we must
                               1185                 :                :          * invalidate all partitions contained in the respective partition
                               1186                 :                :          * trees, not just those explicitly mentioned in the publication.
                               1187                 :                :          */
 1482                          1188         [ +  + ]:             48 :         if (root_relids == NIL)
   11 akapila@postgresql.o     1189                 :GNC          30 :             relids = GetIncludedPublicationRelations(pubform->oid,
                               1190                 :                :                                                      PUBLICATION_PART_ALL);
                               1191                 :                :         else
                               1192                 :                :         {
                               1193                 :                :             /*
                               1194                 :                :              * We already got tables explicitly mentioned in the publication.
                               1195                 :                :              * Now get all partitions for the partitioned table in the list.
                               1196                 :                :              */
 1482 akapila@postgresql.o     1197   [ +  -  +  +  :CBC          36 :             foreach(lc, root_relids)
                                              +  + ]
                               1198                 :             18 :                 relids = GetPubPartitionOptionRelations(relids,
                               1199                 :                :                                                         PUBLICATION_PART_ALL,
                               1200                 :                :                                                         lfirst_oid(lc));
                               1201                 :                :         }
                               1202                 :                : 
 1600                          1203                 :             48 :         schemarelids = GetAllSchemaPublicationRelations(pubform->oid,
                               1204                 :                :                                                         PUBLICATION_PART_ALL);
                               1205                 :             48 :         relids = list_concat_unique_oid(relids, schemarelids);
                               1206                 :                : 
 1635                          1207                 :             48 :         InvalidatePublicationRels(relids);
                               1208                 :                :     }
                               1209                 :                : 
 2672 andres@anarazel.de       1210                 :             58 :     ObjectAddressSet(obj, PublicationRelationId, pubform->oid);
 3342 peter_e@gmx.net          1211                 :             58 :     EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
                               1212                 :                :                                      (Node *) stmt);
                               1213                 :                : 
 2672 andres@anarazel.de       1214         [ -  + ]:             58 :     InvokeObjectPostAlterHook(PublicationRelationId, pubform->oid, 0);
 3342 peter_e@gmx.net          1215                 :             58 : }
                               1216                 :                : 
                               1217                 :                : /*
                               1218                 :                :  * Invalidate the relations.
                               1219                 :                :  */
                               1220                 :                : void
 1635 akapila@postgresql.o     1221                 :           1262 : InvalidatePublicationRels(List *relids)
                               1222                 :                : {
                               1223                 :                :     /*
                               1224                 :                :      * We don't want to send too many individual messages, at some point it's
                               1225                 :                :      * cheaper to just reset whole relcache.
                               1226                 :                :      */
                               1227         [ +  - ]:           1262 :     if (list_length(relids) < MAX_RELCACHE_INVAL_MSGS)
                               1228                 :                :     {
                               1229                 :                :         ListCell   *lc;
                               1230                 :                : 
                               1231   [ +  +  +  +  :          10414 :         foreach(lc, relids)
                                              +  + ]
                               1232                 :           9152 :             CacheInvalidateRelcacheByRelid(lfirst_oid(lc));
                               1233                 :                :     }
                               1234                 :                :     else
 1635 akapila@postgresql.o     1235                 :UBC           0 :         CacheInvalidateRelcacheAll();
 1635 akapila@postgresql.o     1236                 :CBC        1262 : }
                               1237                 :                : 
                               1238                 :                : /*
                               1239                 :                :  * Add or remove table to/from publication.
                               1240                 :                :  */
                               1241                 :                : static void
 1600                          1242                 :            459 : AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup,
                               1243                 :                :                        List *tables, const char *queryString,
                               1244                 :                :                        bool publish_schema)
                               1245                 :                : {
 3342 peter_e@gmx.net          1246                 :            459 :     List       *rels = NIL;
                               1247                 :            459 :     Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
 2672 andres@anarazel.de       1248                 :            459 :     Oid         pubid = pubform->oid;
                               1249                 :                : 
                               1250                 :                :     /*
                               1251                 :                :      * Nothing to do if no objects, except in SET: for that it is quite
                               1252                 :                :      * possible that user has not specified any tables in which case we need
                               1253                 :                :      * to remove all the existing tables.
                               1254                 :                :      */
 1532 alvherre@alvh.no-ip.     1255   [ +  +  +  + ]:            459 :     if (!tables && stmt->action != AP_SetObjects)
 1600 akapila@postgresql.o     1256                 :             38 :         return;
                               1257                 :                : 
 1438 tomas.vondra@postgre     1258                 :            421 :     rels = OpenTableList(tables);
                               1259                 :                : 
 1532 alvherre@alvh.no-ip.     1260         [ +  + ]:            421 :     if (stmt->action == AP_AddObjects)
                               1261                 :                :     {
 1482 akapila@postgresql.o     1262                 :            148 :         TransformPubWhereClauses(rels, queryString, pubform->pubviaroot);
                               1263                 :                : 
 1269                          1264                 :            139 :         publish_schema |= is_schema_publication(pubid);
                               1265                 :                : 
 1265 alvherre@alvh.no-ip.     1266                 :            139 :         CheckPubRelationColumnList(stmt->pubname, rels, publish_schema,
 1269 akapila@postgresql.o     1267                 :            139 :                                    pubform->pubviaroot);
                               1268                 :                : 
 1438 tomas.vondra@postgre     1269                 :            133 :         PublicationAddTables(pubid, rels, false, stmt);
                               1270                 :                :     }
 1532 alvherre@alvh.no-ip.     1271         [ +  + ]:            273 :     else if (stmt->action == AP_DropObjects)
 1438 tomas.vondra@postgre     1272                 :             50 :         PublicationDropTables(pubid, rels, false);
                               1273                 :                :     else                        /* AP_SetObjects */
                               1274                 :                :     {
   11 akapila@postgresql.o     1275                 :GNC         223 :         List       *oldrelids = GetIncludedPublicationRelations(pubid,
                               1276                 :                :                                                                 PUBLICATION_PART_ROOT);
 3342 peter_e@gmx.net          1277                 :CBC         223 :         List       *delrels = NIL;
                               1278                 :                :         ListCell   *oldlc;
                               1279                 :                : 
 1482 akapila@postgresql.o     1280                 :            223 :         TransformPubWhereClauses(rels, queryString, pubform->pubviaroot);
                               1281                 :                : 
 1265 alvherre@alvh.no-ip.     1282                 :            214 :         CheckPubRelationColumnList(stmt->pubname, rels, publish_schema,
 1269 akapila@postgresql.o     1283                 :            214 :                                    pubform->pubviaroot);
                               1284                 :                : 
                               1285                 :                :         /*
                               1286                 :                :          * To recreate the relation list for the publication, look for
                               1287                 :                :          * existing relations that do not need to be dropped.
                               1288                 :                :          */
 3342 peter_e@gmx.net          1289   [ +  +  +  +  :            403 :         foreach(oldlc, oldrelids)
                                              +  + ]
                               1290                 :                :         {
                               1291                 :            201 :             Oid         oldrelid = lfirst_oid(oldlc);
                               1292                 :                :             ListCell   *newlc;
                               1293                 :                :             PublicationRelInfo *oldrel;
                               1294                 :            201 :             bool        found = false;
                               1295                 :                :             HeapTuple   rftuple;
 1482 akapila@postgresql.o     1296                 :            201 :             Node       *oldrelwhereclause = NULL;
 1450 tomas.vondra@postgre     1297                 :            201 :             Bitmapset  *oldcolumns = NULL;
                               1298                 :                : 
                               1299                 :                :             /* look up the cache for the old relmap */
 1482 akapila@postgresql.o     1300                 :            201 :             rftuple = SearchSysCache2(PUBLICATIONRELMAP,
                               1301                 :                :                                       ObjectIdGetDatum(oldrelid),
                               1302                 :                :                                       ObjectIdGetDatum(pubid));
                               1303                 :                : 
                               1304                 :                :             /*
                               1305                 :                :              * See if the existing relation currently has a WHERE clause or a
                               1306                 :                :              * column list. We need to compare those too.
                               1307                 :                :              */
                               1308         [ +  - ]:            201 :             if (HeapTupleIsValid(rftuple))
                               1309                 :                :             {
 1450 tomas.vondra@postgre     1310                 :            201 :                 bool        isnull = true;
                               1311                 :                :                 Datum       whereClauseDatum;
                               1312                 :                :                 Datum       columnListDatum;
                               1313                 :                : 
                               1314                 :                :                 /* Load the WHERE clause for this table. */
 1482 akapila@postgresql.o     1315                 :            201 :                 whereClauseDatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
                               1316                 :                :                                                    Anum_pg_publication_rel_prqual,
                               1317                 :                :                                                    &isnull);
 1450 tomas.vondra@postgre     1318         [ +  + ]:            201 :                 if (!isnull)
 1482 akapila@postgresql.o     1319                 :            105 :                     oldrelwhereclause = stringToNode(TextDatumGetCString(whereClauseDatum));
                               1320                 :                : 
                               1321                 :                :                 /* Transform the int2vector column list to a bitmap. */
 1450 tomas.vondra@postgre     1322                 :            201 :                 columnListDatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
                               1323                 :                :                                                   Anum_pg_publication_rel_prattrs,
                               1324                 :                :                                                   &isnull);
                               1325                 :                : 
                               1326         [ +  + ]:            201 :                 if (!isnull)
                               1327                 :             68 :                     oldcolumns = pub_collist_to_bitmapset(NULL, columnListDatum, NULL);
                               1328                 :                : 
 1482 akapila@postgresql.o     1329                 :            201 :                 ReleaseSysCache(rftuple);
                               1330                 :                :             }
                               1331                 :                : 
 3342 peter_e@gmx.net          1332   [ +  +  +  +  :            389 :             foreach(newlc, rels)
                                              +  + ]
                               1333                 :                :             {
                               1334                 :                :                 PublicationRelInfo *newpubrel;
                               1335                 :                :                 Oid         newrelid;
 1403 tgl@sss.pgh.pa.us        1336                 :            202 :                 Bitmapset  *newcolumns = NULL;
                               1337                 :                : 
 1651 alvherre@alvh.no-ip.     1338                 :            202 :                 newpubrel = (PublicationRelInfo *) lfirst(newlc);
 1450 tomas.vondra@postgre     1339                 :            202 :                 newrelid = RelationGetRelid(newpubrel->relation);
                               1340                 :                : 
                               1341                 :                :                 /*
                               1342                 :                :                  * Validate the column list.  If the column list or WHERE
                               1343                 :                :                  * clause changes, then the validation done here will be
                               1344                 :                :                  * duplicated inside PublicationAddTables().  The validation
                               1345                 :                :                  * is cheap enough that that seems harmless.
                               1346                 :                :                  */
  577 drowley@postgresql.o     1347                 :            202 :                 newcolumns = pub_collist_validate(newpubrel->relation,
                               1348                 :                :                                                   newpubrel->columns);
                               1349                 :                : 
                               1350                 :                :                 /*
                               1351                 :                :                  * Check if any of the new set of relations matches with the
                               1352                 :                :                  * existing relations in the publication. Additionally, if the
                               1353                 :                :                  * relation has an associated WHERE clause, check the WHERE
                               1354                 :                :                  * expressions also match. Same for the column list. Drop the
                               1355                 :                :                  * rest.
                               1356                 :                :                  */
                               1357         [ +  + ]:            196 :                 if (newrelid == oldrelid)
                               1358                 :                :                 {
 1450 tomas.vondra@postgre     1359   [ +  +  +  + ]:            142 :                     if (equal(oldrelwhereclause, newpubrel->whereClause) &&
                               1360                 :             40 :                         bms_equal(oldcolumns, newcolumns))
                               1361                 :                :                     {
 1482 akapila@postgresql.o     1362                 :              8 :                         found = true;
                               1363                 :              8 :                         break;
                               1364                 :                :                     }
                               1365                 :                :                 }
                               1366                 :                :             }
                               1367                 :                : 
                               1368                 :                :             /*
                               1369                 :                :              * Add the non-matched relations to a list so that they can be
                               1370                 :                :              * dropped.
                               1371                 :                :              */
                               1372         [ +  + ]:            195 :             if (!found)
                               1373                 :                :             {
   95 michael@paquier.xyz      1374                 :GNC         187 :                 oldrel = palloc_object(PublicationRelInfo);
 1482 akapila@postgresql.o     1375                 :CBC         187 :                 oldrel->whereClause = NULL;
 1450 tomas.vondra@postgre     1376                 :            187 :                 oldrel->columns = NIL;
   11 akapila@postgresql.o     1377                 :GNC         187 :                 oldrel->except = false;
 1482 akapila@postgresql.o     1378                 :CBC         187 :                 oldrel->relation = table_open(oldrelid,
                               1379                 :                :                                               ShareUpdateExclusiveLock);
                               1380                 :            187 :                 delrels = lappend(delrels, oldrel);
                               1381                 :                :             }
                               1382                 :                :         }
                               1383                 :                : 
                               1384                 :                :         /* And drop them. */
 1438 tomas.vondra@postgre     1385                 :            202 :         PublicationDropTables(pubid, delrels, true);
                               1386                 :                : 
                               1387                 :                :         /*
                               1388                 :                :          * Don't bother calculating the difference for adding, we'll catch and
                               1389                 :                :          * skip existing ones when doing catalog update.
                               1390                 :                :          */
                               1391                 :            202 :         PublicationAddTables(pubid, rels, true, stmt);
                               1392                 :                : 
                               1393                 :            202 :         CloseTableList(delrels);
                               1394                 :                :     }
                               1395                 :                : 
                               1396                 :            355 :     CloseTableList(rels);
                               1397                 :                : }
                               1398                 :                : 
                               1399                 :                : /*
                               1400                 :                :  * Alter the publication schemas.
                               1401                 :                :  *
                               1402                 :                :  * Add or remove schemas to/from publication.
                               1403                 :                :  */
                               1404                 :                : static void
 1600 akapila@postgresql.o     1405                 :            393 : AlterPublicationSchemas(AlterPublicationStmt *stmt,
                               1406                 :                :                         HeapTuple tup, List *schemaidlist)
                               1407                 :                : {
                               1408                 :            393 :     Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
                               1409                 :                : 
                               1410                 :                :     /*
                               1411                 :                :      * Nothing to do if no objects, except in SET: for that it is quite
                               1412                 :                :      * possible that user has not specified any schemas in which case we need
                               1413                 :                :      * to remove all the existing schemas.
                               1414                 :                :      */
 1532 alvherre@alvh.no-ip.     1415   [ +  +  +  + ]:            393 :     if (!schemaidlist && stmt->action != AP_SetObjects)
 1600 akapila@postgresql.o     1416                 :            153 :         return;
                               1417                 :                : 
                               1418                 :                :     /*
                               1419                 :                :      * Schema lock is held until the publication is altered to prevent
                               1420                 :                :      * concurrent schema deletion.
                               1421                 :                :      */
                               1422                 :            240 :     LockSchemaList(schemaidlist);
 1532 alvherre@alvh.no-ip.     1423         [ +  + ]:            240 :     if (stmt->action == AP_AddObjects)
                               1424                 :                :     {
                               1425                 :                :         ListCell   *lc;
                               1426                 :                :         List       *reloids;
                               1427                 :                : 
   11 akapila@postgresql.o     1428                 :GNC          19 :         reloids = GetIncludedPublicationRelations(pubform->oid,
                               1429                 :                :                                                   PUBLICATION_PART_ROOT);
                               1430                 :                : 
 1269 akapila@postgresql.o     1431   [ +  +  +  +  :CBC          27 :         foreach(lc, reloids)
                                              +  + ]
                               1432                 :                :         {
                               1433                 :                :             HeapTuple   coltuple;
                               1434                 :                : 
                               1435                 :             11 :             coltuple = SearchSysCache2(PUBLICATIONRELMAP,
                               1436                 :                :                                        ObjectIdGetDatum(lfirst_oid(lc)),
                               1437                 :                :                                        ObjectIdGetDatum(pubform->oid));
                               1438                 :                : 
                               1439         [ -  + ]:             11 :             if (!HeapTupleIsValid(coltuple))
 1269 akapila@postgresql.o     1440                 :UBC           0 :                 continue;
                               1441                 :                : 
                               1442                 :                :             /*
                               1443                 :                :              * Disallow adding schema if column list is already part of the
                               1444                 :                :              * publication. See CheckPubRelationColumnList.
                               1445                 :                :              */
 1269 akapila@postgresql.o     1446         [ +  + ]:CBC          11 :             if (!heap_attisnull(coltuple, Anum_pg_publication_rel_prattrs, NULL))
                               1447         [ +  - ]:              3 :                 ereport(ERROR,
                               1448                 :                :                         errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                               1449                 :                :                         errmsg("cannot add schema to publication \"%s\"",
                               1450                 :                :                                stmt->pubname),
                               1451                 :                :                         errdetail("Schemas cannot be added if any tables that specify a column list are already part of the publication."));
                               1452                 :                : 
                               1453                 :              8 :             ReleaseSysCache(coltuple);
                               1454                 :                :         }
                               1455                 :                : 
 1438 tomas.vondra@postgre     1456                 :             16 :         PublicationAddSchemas(pubform->oid, schemaidlist, false, stmt);
                               1457                 :                :     }
 1532 alvherre@alvh.no-ip.     1458         [ +  + ]:            221 :     else if (stmt->action == AP_DropObjects)
 1438 tomas.vondra@postgre     1459                 :             19 :         PublicationDropSchemas(pubform->oid, schemaidlist, false);
                               1460                 :                :     else                        /* AP_SetObjects */
                               1461                 :                :     {
                               1462                 :            202 :         List       *oldschemaids = GetPublicationSchemas(pubform->oid);
 1600 akapila@postgresql.o     1463                 :            202 :         List       *delschemas = NIL;
                               1464                 :                : 
                               1465                 :                :         /* Identify which schemas should be dropped */
                               1466                 :            202 :         delschemas = list_difference_oid(oldschemaids, schemaidlist);
                               1467                 :                : 
                               1468                 :                :         /*
                               1469                 :                :          * Schema lock is held until the publication is altered to prevent
                               1470                 :                :          * concurrent schema deletion.
                               1471                 :                :          */
                               1472                 :            202 :         LockSchemaList(delschemas);
                               1473                 :                : 
                               1474                 :                :         /* And drop them */
 1438 tomas.vondra@postgre     1475                 :            202 :         PublicationDropSchemas(pubform->oid, delschemas, true);
                               1476                 :                : 
                               1477                 :                :         /*
                               1478                 :                :          * Don't bother calculating the difference for adding, we'll catch and
                               1479                 :                :          * skip existing ones when doing catalog update.
                               1480                 :                :          */
                               1481                 :            202 :         PublicationAddSchemas(pubform->oid, schemaidlist, true, stmt);
                               1482                 :                :     }
                               1483                 :                : }
                               1484                 :                : 
                               1485                 :                : /*
                               1486                 :                :  * Check if relations and schemas can be in a given publication and throw
                               1487                 :                :  * appropriate error if not.
                               1488                 :                :  */
                               1489                 :                : static void
 1600 akapila@postgresql.o     1490                 :            480 : CheckAlterPublication(AlterPublicationStmt *stmt, HeapTuple tup,
                               1491                 :                :                       List *tables, List *schemaidlist)
                               1492                 :                : {
                               1493                 :            480 :     Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
                               1494                 :                : 
 1532 alvherre@alvh.no-ip.     1495   [ +  +  +  +  :            480 :     if ((stmt->action == AP_AddObjects || stmt->action == AP_SetObjects) &&
                                              +  + ]
 1438 tomas.vondra@postgre     1496         [ +  + ]:             52 :         schemaidlist && !superuser())
 1600 akapila@postgresql.o     1497         [ +  - ]:              3 :         ereport(ERROR,
                               1498                 :                :                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
                               1499                 :                :                  errmsg("must be superuser to add or set schemas")));
                               1500                 :                : 
                               1501                 :                :     /*
                               1502                 :                :      * Check that user is allowed to manipulate the publication tables in
                               1503                 :                :      * schema
                               1504                 :                :      */
  157 akapila@postgresql.o     1505   [ +  +  +  +  :GNC         477 :     if (schemaidlist && (pubform->puballtables || pubform->puballsequences))
                                              -  + ]
                               1506                 :                :     {
                               1507   [ +  -  -  + ]:              9 :         if (pubform->puballtables && pubform->puballsequences)
  157 akapila@postgresql.o     1508         [ #  # ]:UNC           0 :             ereport(ERROR,
                               1509                 :                :                     errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                               1510                 :                :                     errmsg("publication \"%s\" is defined as FOR ALL TABLES, ALL SEQUENCES",
                               1511                 :                :                            NameStr(pubform->pubname)),
                               1512                 :                :                     errdetail("Schemas cannot be added to or dropped from FOR ALL TABLES, ALL SEQUENCES publications."));
  157 akapila@postgresql.o     1513         [ +  - ]:GNC           9 :         else if (pubform->puballtables)
                               1514         [ +  - ]:              9 :             ereport(ERROR,
                               1515                 :                :                     errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                               1516                 :                :                     errmsg("publication \"%s\" is defined as FOR ALL TABLES",
                               1517                 :                :                            NameStr(pubform->pubname)),
                               1518                 :                :                     errdetail("Schemas cannot be added to or dropped from FOR ALL TABLES publications."));
                               1519                 :                :         else
  157 akapila@postgresql.o     1520         [ #  # ]:UNC           0 :             ereport(ERROR,
                               1521                 :                :                     errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                               1522                 :                :                     errmsg("publication \"%s\" is defined as FOR ALL SEQUENCES",
                               1523                 :                :                            NameStr(pubform->pubname)),
                               1524                 :                :                     errdetail("Schemas cannot be added to or dropped from FOR ALL SEQUENCES publications."));
                               1525                 :                :     }
                               1526                 :                : 
                               1527                 :                :     /* Check that user is allowed to manipulate the publication tables. */
  157 akapila@postgresql.o     1528   [ +  +  +  +  :GNC         468 :     if (tables && (pubform->puballtables || pubform->puballsequences))
                                              -  + ]
                               1529                 :                :     {
                               1530   [ +  -  -  + ]:              9 :         if (pubform->puballtables && pubform->puballsequences)
  157 akapila@postgresql.o     1531         [ #  # ]:UNC           0 :             ereport(ERROR,
                               1532                 :                :                     errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                               1533                 :                :                     errmsg("publication \"%s\" is defined as FOR ALL TABLES, ALL SEQUENCES",
                               1534                 :                :                            NameStr(pubform->pubname)),
                               1535                 :                :                     errdetail("Tables or sequences cannot be added to or dropped from FOR ALL TABLES, ALL SEQUENCES publications."));
  157 akapila@postgresql.o     1536         [ +  - ]:GNC           9 :         else if (pubform->puballtables)
                               1537         [ +  - ]:              9 :             ereport(ERROR,
                               1538                 :                :                     errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                               1539                 :                :                     errmsg("publication \"%s\" is defined as FOR ALL TABLES",
                               1540                 :                :                            NameStr(pubform->pubname)),
                               1541                 :                :                     errdetail("Tables or sequences cannot be added to or dropped from FOR ALL TABLES publications."));
                               1542                 :                :         else
  157 akapila@postgresql.o     1543         [ #  # ]:UNC           0 :             ereport(ERROR,
                               1544                 :                :                     errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                               1545                 :                :                     errmsg("publication \"%s\" is defined as FOR ALL SEQUENCES",
                               1546                 :                :                            NameStr(pubform->pubname)),
                               1547                 :                :                     errdetail("Tables or sequences cannot be added to or dropped from FOR ALL SEQUENCES publications."));
                               1548                 :                :     }
 1600 akapila@postgresql.o     1549                 :CBC         459 : }
                               1550                 :                : 
                               1551                 :                : /*
                               1552                 :                :  * Alter the existing publication.
                               1553                 :                :  *
                               1554                 :                :  * This is dispatcher function for AlterPublicationOptions,
                               1555                 :                :  * AlterPublicationSchemas and AlterPublicationTables.
                               1556                 :                :  */
                               1557                 :                : void
 1704 dean.a.rasheed@gmail     1558                 :            553 : AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt)
                               1559                 :                : {
                               1560                 :                :     Relation    rel;
                               1561                 :                :     HeapTuple   tup;
                               1562                 :                :     Form_pg_publication pubform;
                               1563                 :                : 
 2610 andres@anarazel.de       1564                 :            553 :     rel = table_open(PublicationRelationId, RowExclusiveLock);
                               1565                 :                : 
 3342 peter_e@gmx.net          1566                 :            553 :     tup = SearchSysCacheCopy1(PUBLICATIONNAME,
                               1567                 :                :                               CStringGetDatum(stmt->pubname));
                               1568                 :                : 
                               1569         [ -  + ]:            553 :     if (!HeapTupleIsValid(tup))
 3342 peter_e@gmx.net          1570         [ #  # ]:UBC           0 :         ereport(ERROR,
                               1571                 :                :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
                               1572                 :                :                  errmsg("publication \"%s\" does not exist",
                               1573                 :                :                         stmt->pubname)));
                               1574                 :                : 
 2672 andres@anarazel.de       1575                 :CBC         553 :     pubform = (Form_pg_publication) GETSTRUCT(tup);
                               1576                 :                : 
                               1577                 :                :     /* must be owner */
 1218 peter@eisentraut.org     1578         [ -  + ]:            553 :     if (!object_ownercheck(PublicationRelationId, pubform->oid, GetUserId()))
 3025 peter_e@gmx.net          1579                 :UBC           0 :         aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_PUBLICATION,
 3342                          1580                 :              0 :                        stmt->pubname);
                               1581                 :                : 
 3342 peter_e@gmx.net          1582         [ +  + ]:CBC         553 :     if (stmt->options)
 1704 dean.a.rasheed@gmail     1583                 :             64 :         AlterPublicationOptions(pstate, stmt, rel, tup);
                               1584                 :                :     else
                               1585                 :                :     {
 1438 tomas.vondra@postgre     1586                 :            489 :         List       *relations = NIL;
   11 akapila@postgresql.o     1587                 :GNC         489 :         List       *exceptrelations = NIL;
 1438 tomas.vondra@postgre     1588                 :CBC         489 :         List       *schemaidlist = NIL;
 1482 akapila@postgresql.o     1589                 :            489 :         Oid         pubid = pubform->oid;
                               1590                 :                : 
 1438 tomas.vondra@postgre     1591                 :            489 :         ObjectsInPublicationToOids(stmt->pubobjects, pstate, &relations,
                               1592                 :                :                                    &exceptrelations, &schemaidlist);
                               1593                 :                : 
                               1594                 :                :         /* EXCEPT clause is not supported with ALTER PUBLICATION */
   11 akapila@postgresql.o     1595         [ -  + ]:GNC         480 :         Assert(exceptrelations == NIL);
                               1596                 :                : 
 1438 tomas.vondra@postgre     1597                 :CBC         480 :         CheckAlterPublication(stmt, tup, relations, schemaidlist);
                               1598                 :                : 
 1482 akapila@postgresql.o     1599                 :            459 :         heap_freetuple(tup);
                               1600                 :                : 
                               1601                 :                :         /* Lock the publication so nobody else can do anything with it. */
                               1602                 :            459 :         LockDatabaseObject(PublicationRelationId, pubid, 0,
                               1603                 :                :                            AccessExclusiveLock);
                               1604                 :                : 
                               1605                 :                :         /*
                               1606                 :                :          * It is possible that by the time we acquire the lock on publication,
                               1607                 :                :          * concurrent DDL has removed it. We can test this by checking the
                               1608                 :                :          * existence of publication. We get the tuple again to avoid the risk
                               1609                 :                :          * of any publication option getting changed.
                               1610                 :                :          */
                               1611                 :            459 :         tup = SearchSysCacheCopy1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
                               1612         [ -  + ]:            459 :         if (!HeapTupleIsValid(tup))
 1600 akapila@postgresql.o     1613         [ #  # ]:UBC           0 :             ereport(ERROR,
                               1614                 :                :                     errcode(ERRCODE_UNDEFINED_OBJECT),
                               1615                 :                :                     errmsg("publication \"%s\" does not exist",
                               1616                 :                :                            stmt->pubname));
                               1617                 :                : 
 1269 akapila@postgresql.o     1618                 :CBC         459 :         AlterPublicationTables(stmt, tup, relations, pstate->p_sourcetext,
                               1619                 :                :                                schemaidlist != NIL);
 1438 tomas.vondra@postgre     1620                 :            393 :         AlterPublicationSchemas(stmt, tup, schemaidlist);
                               1621                 :                :     }
                               1622                 :                : 
                               1623                 :                :     /* Cleanup. */
 3342 peter_e@gmx.net          1624                 :            442 :     heap_freetuple(tup);
 2610 andres@anarazel.de       1625                 :            442 :     table_close(rel, RowExclusiveLock);
 3342 peter_e@gmx.net          1626                 :            442 : }
                               1627                 :                : 
                               1628                 :                : /*
                               1629                 :                :  * Remove relation from publication by mapping OID.
                               1630                 :                :  */
                               1631                 :                : void
                               1632                 :            459 : RemovePublicationRelById(Oid proid)
                               1633                 :                : {
                               1634                 :                :     Relation    rel;
                               1635                 :                :     HeapTuple   tup;
                               1636                 :                :     Form_pg_publication_rel pubrel;
 1635 akapila@postgresql.o     1637                 :            459 :     List       *relids = NIL;
                               1638                 :                : 
 2610 andres@anarazel.de       1639                 :            459 :     rel = table_open(PublicationRelRelationId, RowExclusiveLock);
                               1640                 :                : 
 3342 peter_e@gmx.net          1641                 :            459 :     tup = SearchSysCache1(PUBLICATIONREL, ObjectIdGetDatum(proid));
                               1642                 :                : 
                               1643         [ -  + ]:            459 :     if (!HeapTupleIsValid(tup))
 3342 peter_e@gmx.net          1644         [ #  # ]:UBC           0 :         elog(ERROR, "cache lookup failed for publication table %u",
                               1645                 :                :              proid);
                               1646                 :                : 
 3342 peter_e@gmx.net          1647                 :CBC         459 :     pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
                               1648                 :                : 
                               1649                 :                :     /*
                               1650                 :                :      * Invalidate relcache so that publication info is rebuilt.
                               1651                 :                :      *
                               1652                 :                :      * For the partitioned tables, we must invalidate all partitions contained
                               1653                 :                :      * in the respective partition hierarchies, not just the one explicitly
                               1654                 :                :      * mentioned in the publication. This is required because we implicitly
                               1655                 :                :      * publish the child tables when the parent table is published.
                               1656                 :                :      */
 1635 akapila@postgresql.o     1657                 :            459 :     relids = GetPubPartitionOptionRelations(relids, PUBLICATION_PART_ALL,
                               1658                 :                :                                             pubrel->prrelid);
                               1659                 :                : 
                               1660                 :            459 :     InvalidatePublicationRels(relids);
                               1661                 :                : 
 3329 tgl@sss.pgh.pa.us        1662                 :            459 :     CatalogTupleDelete(rel, &tup->t_self);
                               1663                 :                : 
 3342 peter_e@gmx.net          1664                 :            459 :     ReleaseSysCache(tup);
                               1665                 :                : 
 2610 andres@anarazel.de       1666                 :            459 :     table_close(rel, RowExclusiveLock);
 3342 peter_e@gmx.net          1667                 :            459 : }
                               1668                 :                : 
                               1669                 :                : /*
                               1670                 :                :  * Remove the publication by mapping OID.
                               1671                 :                :  */
                               1672                 :                : void
 1649 akapila@postgresql.o     1673                 :            274 : RemovePublicationById(Oid pubid)
                               1674                 :                : {
                               1675                 :                :     Relation    rel;
                               1676                 :                :     HeapTuple   tup;
                               1677                 :                :     Form_pg_publication pubform;
                               1678                 :                : 
                               1679                 :            274 :     rel = table_open(PublicationRelationId, RowExclusiveLock);
                               1680                 :                : 
                               1681                 :            274 :     tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
                               1682         [ -  + ]:            274 :     if (!HeapTupleIsValid(tup))
 1649 akapila@postgresql.o     1683         [ #  # ]:UBC           0 :         elog(ERROR, "cache lookup failed for publication %u", pubid);
                               1684                 :                : 
 1536 alvherre@alvh.no-ip.     1685                 :CBC         274 :     pubform = (Form_pg_publication) GETSTRUCT(tup);
                               1686                 :                : 
                               1687                 :                :     /* Invalidate relcache so that publication info is rebuilt. */
 1438 tomas.vondra@postgre     1688         [ +  + ]:            274 :     if (pubform->puballtables)
 1649 akapila@postgresql.o     1689                 :             58 :         CacheInvalidateRelcacheAll();
                               1690                 :                : 
                               1691                 :            274 :     CatalogTupleDelete(rel, &tup->t_self);
                               1692                 :                : 
                               1693                 :            274 :     ReleaseSysCache(tup);
                               1694                 :                : 
                               1695                 :            274 :     table_close(rel, RowExclusiveLock);
                               1696                 :            274 : }
                               1697                 :                : 
                               1698                 :                : /*
                               1699                 :                :  * Remove schema from publication by mapping OID.
                               1700                 :                :  */
                               1701                 :                : void
 1600                          1702                 :             96 : RemovePublicationSchemaById(Oid psoid)
                               1703                 :                : {
                               1704                 :                :     Relation    rel;
                               1705                 :                :     HeapTuple   tup;
                               1706                 :             96 :     List       *schemaRels = NIL;
                               1707                 :                :     Form_pg_publication_namespace pubsch;
                               1708                 :                : 
                               1709                 :             96 :     rel = table_open(PublicationNamespaceRelationId, RowExclusiveLock);
                               1710                 :                : 
                               1711                 :             96 :     tup = SearchSysCache1(PUBLICATIONNAMESPACE, ObjectIdGetDatum(psoid));
                               1712                 :                : 
                               1713         [ -  + ]:             96 :     if (!HeapTupleIsValid(tup))
 1600 akapila@postgresql.o     1714         [ #  # ]:UBC           0 :         elog(ERROR, "cache lookup failed for publication schema %u", psoid);
                               1715                 :                : 
 1600 akapila@postgresql.o     1716                 :CBC          96 :     pubsch = (Form_pg_publication_namespace) GETSTRUCT(tup);
                               1717                 :                : 
                               1718                 :                :     /*
                               1719                 :                :      * Invalidate relcache so that publication info is rebuilt. See
                               1720                 :                :      * RemovePublicationRelById for why we need to consider all the
                               1721                 :                :      * partitions.
                               1722                 :                :      */
                               1723                 :             96 :     schemaRels = GetSchemaPublicationRelations(pubsch->pnnspid,
                               1724                 :                :                                                PUBLICATION_PART_ALL);
                               1725                 :             96 :     InvalidatePublicationRels(schemaRels);
                               1726                 :                : 
                               1727                 :             96 :     CatalogTupleDelete(rel, &tup->t_self);
                               1728                 :                : 
                               1729                 :             96 :     ReleaseSysCache(tup);
                               1730                 :                : 
                               1731                 :             96 :     table_close(rel, RowExclusiveLock);
                               1732                 :             96 : }
                               1733                 :                : 
                               1734                 :                : /*
                               1735                 :                :  * Open relations specified by a PublicationTable list.
                               1736                 :                :  * The returned tables are locked in ShareUpdateExclusiveLock mode in order to
                               1737                 :                :  * add them to a publication.
                               1738                 :                :  */
                               1739                 :                : static List *
 1438 tomas.vondra@postgre     1740                 :            688 : OpenTableList(List *tables)
                               1741                 :                : {
 3342 peter_e@gmx.net          1742                 :            688 :     List       *relids = NIL;
 1438 tomas.vondra@postgre     1743                 :            688 :     List       *rels = NIL;
                               1744                 :                :     ListCell   *lc;
 1482 akapila@postgresql.o     1745                 :            688 :     List       *relids_with_rf = NIL;
 1450 tomas.vondra@postgre     1746                 :            688 :     List       *relids_with_collist = NIL;
                               1747                 :                : 
                               1748                 :                :     /*
                               1749                 :                :      * Open, share-lock, and check all the explicitly-specified relations
                               1750                 :                :      */
 1438                          1751   [ +  +  +  +  :           1416 :     foreach(lc, tables)
                                              +  + ]
                               1752                 :                :     {
 1651 alvherre@alvh.no-ip.     1753                 :            743 :         PublicationTable *t = lfirst_node(PublicationTable, lc);
                               1754                 :            743 :         bool        recurse = t->relation->inh;
                               1755                 :                :         Relation    rel;
                               1756                 :                :         Oid         myrelid;
                               1757                 :                :         PublicationRelInfo *pub_rel;
                               1758                 :                : 
                               1759                 :                :         /* Allow query cancel in case this takes a long time */
 3342 peter_e@gmx.net          1760         [ -  + ]:            743 :         CHECK_FOR_INTERRUPTS();
                               1761                 :                : 
 1651 alvherre@alvh.no-ip.     1762                 :            743 :         rel = table_openrv(t->relation, ShareUpdateExclusiveLock);
 3342 peter_e@gmx.net          1763                 :            740 :         myrelid = RelationGetRelid(rel);
                               1764                 :                : 
                               1765                 :                :         /*
                               1766                 :                :          * Filter out duplicates if user specifies "foo, foo".
                               1767                 :                :          *
                               1768                 :                :          * Note that this algorithm is known to not be very efficient (O(N^2))
                               1769                 :                :          * but given that it only works on list of tables given to us by user
                               1770                 :                :          * it's deemed acceptable.
                               1771                 :                :          */
                               1772         [ +  + ]:            740 :         if (list_member_oid(relids, myrelid))
                               1773                 :                :         {
                               1774                 :                :             /* Disallow duplicate tables if there are any with row filters. */
 1482 akapila@postgresql.o     1775   [ +  +  +  + ]:             12 :             if (t->whereClause || list_member_oid(relids_with_rf, myrelid))
                               1776         [ +  - ]:              6 :                 ereport(ERROR,
                               1777                 :                :                         (errcode(ERRCODE_DUPLICATE_OBJECT),
                               1778                 :                :                          errmsg("conflicting or redundant WHERE clauses for table \"%s\"",
                               1779                 :                :                                 RelationGetRelationName(rel))));
                               1780                 :                : 
                               1781                 :                :             /* Disallow duplicate tables if there are any with column lists. */
 1450 tomas.vondra@postgre     1782   [ +  +  +  - ]:              6 :             if (t->columns || list_member_oid(relids_with_collist, myrelid))
                               1783         [ +  - ]:              6 :                 ereport(ERROR,
                               1784                 :                :                         (errcode(ERRCODE_DUPLICATE_OBJECT),
                               1785                 :                :                          errmsg("conflicting or redundant column lists for table \"%s\"",
                               1786                 :                :                                 RelationGetRelationName(rel))));
                               1787                 :                : 
 2610 andres@anarazel.de       1788                 :UBC           0 :             table_close(rel, ShareUpdateExclusiveLock);
 3342 peter_e@gmx.net          1789                 :              0 :             continue;
                               1790                 :                :         }
                               1791                 :                : 
   95 michael@paquier.xyz      1792                 :GNC         728 :         pub_rel = palloc_object(PublicationRelInfo);
 1651 alvherre@alvh.no-ip.     1793                 :CBC         728 :         pub_rel->relation = rel;
 1482 akapila@postgresql.o     1794                 :            728 :         pub_rel->whereClause = t->whereClause;
 1450 tomas.vondra@postgre     1795                 :            728 :         pub_rel->columns = t->columns;
   11 akapila@postgresql.o     1796                 :GNC         728 :         pub_rel->except = t->except;
 1438 tomas.vondra@postgre     1797                 :CBC         728 :         rels = lappend(rels, pub_rel);
 3342 peter_e@gmx.net          1798                 :            728 :         relids = lappend_oid(relids, myrelid);
                               1799                 :                : 
 1482 akapila@postgresql.o     1800         [ +  + ]:            728 :         if (t->whereClause)
                               1801                 :            202 :             relids_with_rf = lappend_oid(relids_with_rf, myrelid);
                               1802                 :                : 
 1450 tomas.vondra@postgre     1803         [ +  + ]:            728 :         if (t->columns)
                               1804                 :            205 :             relids_with_collist = lappend_oid(relids_with_collist, myrelid);
                               1805                 :                : 
                               1806                 :                :         /*
                               1807                 :                :          * Add children of this rel, if requested, so that they too are added
                               1808                 :                :          * to the publication.  A partitioned table can't have any inheritance
                               1809                 :                :          * children other than its partitions, which need not be explicitly
                               1810                 :                :          * added to the publication.
                               1811                 :                :          */
 2196 peter@eisentraut.org     1812   [ +  +  +  + ]:            728 :         if (recurse && rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
                               1813                 :                :         {
                               1814                 :                :             List       *children;
                               1815                 :                :             ListCell   *child;
                               1816                 :                : 
 3342 peter_e@gmx.net          1817                 :            607 :             children = find_all_inheritors(myrelid, ShareUpdateExclusiveLock,
                               1818                 :                :                                            NULL);
                               1819                 :                : 
                               1820   [ +  -  +  +  :           1226 :             foreach(child, children)
                                              +  + ]
                               1821                 :                :             {
                               1822                 :            619 :                 Oid         childrelid = lfirst_oid(child);
                               1823                 :                : 
                               1824                 :                :                 /* Allow query cancel in case this takes a long time */
 2655 tgl@sss.pgh.pa.us        1825         [ -  + ]:            619 :                 CHECK_FOR_INTERRUPTS();
                               1826                 :                : 
                               1827                 :                :                 /*
                               1828                 :                :                  * Skip duplicates if user specified both parent and child
                               1829                 :                :                  * tables.
                               1830                 :                :                  */
 3342 peter_e@gmx.net          1831         [ +  + ]:            619 :                 if (list_member_oid(relids, childrelid))
                               1832                 :                :                 {
                               1833                 :                :                     /*
                               1834                 :                :                      * We don't allow to specify row filter for both parent
                               1835                 :                :                      * and child table at the same time as it is not very
                               1836                 :                :                      * clear which one should be given preference.
                               1837                 :                :                      */
 1482 akapila@postgresql.o     1838         [ -  + ]:            607 :                     if (childrelid != myrelid &&
 1482 akapila@postgresql.o     1839   [ #  #  #  # ]:UBC           0 :                         (t->whereClause || list_member_oid(relids_with_rf, childrelid)))
                               1840         [ #  # ]:              0 :                         ereport(ERROR,
                               1841                 :                :                                 (errcode(ERRCODE_DUPLICATE_OBJECT),
                               1842                 :                :                                  errmsg("conflicting or redundant WHERE clauses for table \"%s\"",
                               1843                 :                :                                         RelationGetRelationName(rel))));
                               1844                 :                : 
                               1845                 :                :                     /*
                               1846                 :                :                      * We don't allow to specify column list for both parent
                               1847                 :                :                      * and child table at the same time as it is not very
                               1848                 :                :                      * clear which one should be given preference.
                               1849                 :                :                      */
 1450 tomas.vondra@postgre     1850         [ -  + ]:CBC         607 :                     if (childrelid != myrelid &&
 1450 tomas.vondra@postgre     1851   [ #  #  #  # ]:UBC           0 :                         (t->columns || list_member_oid(relids_with_collist, childrelid)))
                               1852         [ #  # ]:              0 :                         ereport(ERROR,
                               1853                 :                :                                 (errcode(ERRCODE_DUPLICATE_OBJECT),
                               1854                 :                :                                  errmsg("conflicting or redundant column lists for table \"%s\"",
                               1855                 :                :                                         RelationGetRelationName(rel))));
                               1856                 :                : 
 3342 peter_e@gmx.net          1857                 :CBC         607 :                     continue;
                               1858                 :                :                 }
                               1859                 :                : 
                               1860                 :                :                 /* find_all_inheritors already got lock */
 2610 andres@anarazel.de       1861                 :             12 :                 rel = table_open(childrelid, NoLock);
   95 michael@paquier.xyz      1862                 :GNC          12 :                 pub_rel = palloc_object(PublicationRelInfo);
 1651 alvherre@alvh.no-ip.     1863                 :CBC          12 :                 pub_rel->relation = rel;
                               1864                 :                :                 /* child inherits WHERE clause from parent */
 1482 akapila@postgresql.o     1865                 :             12 :                 pub_rel->whereClause = t->whereClause;
                               1866                 :                : 
                               1867                 :                :                 /* child inherits column list from parent */
 1450 tomas.vondra@postgre     1868                 :             12 :                 pub_rel->columns = t->columns;
   11 akapila@postgresql.o     1869                 :GNC          12 :                 pub_rel->except = t->except;
 1438 tomas.vondra@postgre     1870                 :CBC          12 :                 rels = lappend(rels, pub_rel);
 3342 peter_e@gmx.net          1871                 :             12 :                 relids = lappend_oid(relids, childrelid);
                               1872                 :                : 
 1482 akapila@postgresql.o     1873         [ +  + ]:             12 :                 if (t->whereClause)
                               1874                 :              1 :                     relids_with_rf = lappend_oid(relids_with_rf, childrelid);
                               1875                 :                : 
 1450 tomas.vondra@postgre     1876         [ -  + ]:             12 :                 if (t->columns)
 1450 tomas.vondra@postgre     1877                 :UBC           0 :                     relids_with_collist = lappend_oid(relids_with_collist, childrelid);
                               1878                 :                :             }
                               1879                 :                :         }
                               1880                 :                :     }
                               1881                 :                : 
 3342 peter_e@gmx.net          1882                 :CBC         673 :     list_free(relids);
 1482 akapila@postgresql.o     1883                 :            673 :     list_free(relids_with_rf);
                               1884                 :                : 
 1438 tomas.vondra@postgre     1885                 :            673 :     return rels;
                               1886                 :                : }
                               1887                 :                : 
                               1888                 :                : /*
                               1889                 :                :  * Close all relations in the list.
                               1890                 :                :  */
                               1891                 :                : static void
                               1892                 :            778 : CloseTableList(List *rels)
                               1893                 :                : {
                               1894                 :                :     ListCell   *lc;
                               1895                 :                : 
 3342 peter_e@gmx.net          1896   [ +  +  +  +  :           1596 :     foreach(lc, rels)
                                              +  + ]
                               1897                 :                :     {
                               1898                 :                :         PublicationRelInfo *pub_rel;
                               1899                 :                : 
 1651 alvherre@alvh.no-ip.     1900                 :            818 :         pub_rel = (PublicationRelInfo *) lfirst(lc);
                               1901                 :            818 :         table_close(pub_rel->relation, NoLock);
                               1902                 :                :     }
                               1903                 :                : 
 1482 akapila@postgresql.o     1904                 :            778 :     list_free_deep(rels);
 3342 peter_e@gmx.net          1905                 :            778 : }
                               1906                 :                : 
                               1907                 :                : /*
                               1908                 :                :  * Lock the schemas specified in the schema list in AccessShareLock mode in
                               1909                 :                :  * order to prevent concurrent schema deletion.
                               1910                 :                :  */
                               1911                 :                : static void
 1600 akapila@postgresql.o     1912                 :            518 : LockSchemaList(List *schemalist)
                               1913                 :                : {
                               1914                 :                :     ListCell   *lc;
                               1915                 :                : 
                               1916   [ +  +  +  +  :            680 :     foreach(lc, schemalist)
                                              +  + ]
                               1917                 :                :     {
                               1918                 :            162 :         Oid         schemaid = lfirst_oid(lc);
                               1919                 :                : 
                               1920                 :                :         /* Allow query cancel in case this takes a long time */
                               1921         [ -  + ]:            162 :         CHECK_FOR_INTERRUPTS();
                               1922                 :            162 :         LockDatabaseObject(NamespaceRelationId, schemaid, 0, AccessShareLock);
                               1923                 :                : 
                               1924                 :                :         /*
                               1925                 :                :          * It is possible that by the time we acquire the lock on schema,
                               1926                 :                :          * concurrent DDL has removed it. We can test this by checking the
                               1927                 :                :          * existence of schema.
                               1928                 :                :          */
                               1929         [ -  + ]:            162 :         if (!SearchSysCacheExists1(NAMESPACEOID, ObjectIdGetDatum(schemaid)))
 1600 akapila@postgresql.o     1930         [ #  # ]:UBC           0 :             ereport(ERROR,
                               1931                 :                :                     errcode(ERRCODE_UNDEFINED_SCHEMA),
                               1932                 :                :                     errmsg("schema with OID %u does not exist", schemaid));
                               1933                 :                :     }
 1600 akapila@postgresql.o     1934                 :CBC         518 : }
                               1935                 :                : 
                               1936                 :                : /*
                               1937                 :                :  * Add listed tables to the publication.
                               1938                 :                :  */
                               1939                 :                : static void
 1438 tomas.vondra@postgre     1940                 :            572 : PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
                               1941                 :                :                      AlterPublicationStmt *stmt)
                               1942                 :                : {
                               1943                 :                :     ListCell   *lc;
                               1944                 :                : 
 3342 peter_e@gmx.net          1945   [ +  +  +  +  :           1159 :     foreach(lc, rels)
                                              +  + ]
                               1946                 :                :     {
 1651 alvherre@alvh.no-ip.     1947                 :            624 :         PublicationRelInfo *pub_rel = (PublicationRelInfo *) lfirst(lc);
                               1948                 :            624 :         Relation    rel = pub_rel->relation;
                               1949                 :                :         ObjectAddress obj;
                               1950                 :                : 
                               1951                 :                :         /* Must be owner of the table or superuser. */
 1218 peter@eisentraut.org     1952         [ +  + ]:            624 :         if (!object_ownercheck(RelationRelationId, RelationGetRelid(rel), GetUserId()))
 3025 peter_e@gmx.net          1953                 :              3 :             aclcheck_error(ACLCHECK_NOT_OWNER, get_relkind_objtype(rel->rd_rel->relkind),
 3342                          1954                 :              3 :                            RelationGetRelationName(rel));
                               1955                 :                : 
 1651 alvherre@alvh.no-ip.     1956                 :            621 :         obj = publication_add_relation(pubid, pub_rel, if_not_exists);
 3342 peter_e@gmx.net          1957         [ +  + ]:            587 :         if (stmt)
                               1958                 :                :         {
                               1959                 :            314 :             EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
                               1960                 :                :                                              (Node *) stmt);
                               1961                 :                : 
                               1962         [ -  + ]:            314 :             InvokeObjectPostCreateHook(PublicationRelRelationId,
                               1963                 :                :                                        obj.objectId, 0);
                               1964                 :                :         }
                               1965                 :                :     }
                               1966                 :            535 : }
                               1967                 :                : 
                               1968                 :                : /*
                               1969                 :                :  * Remove listed tables from the publication.
                               1970                 :                :  */
                               1971                 :                : static void
 1438 tomas.vondra@postgre     1972                 :            252 : PublicationDropTables(Oid pubid, List *rels, bool missing_ok)
                               1973                 :                : {
                               1974                 :                :     ObjectAddress obj;
                               1975                 :                :     ListCell   *lc;
                               1976                 :                :     Oid         prid;
                               1977                 :                : 
 3342 peter_e@gmx.net          1978   [ +  +  +  +  :            483 :     foreach(lc, rels)
                                              +  + ]
                               1979                 :                :     {
 1651 alvherre@alvh.no-ip.     1980                 :            240 :         PublicationRelInfo *pubrel = (PublicationRelInfo *) lfirst(lc);
                               1981                 :            240 :         Relation    rel = pubrel->relation;
 3342 peter_e@gmx.net          1982                 :            240 :         Oid         relid = RelationGetRelid(rel);
                               1983                 :                : 
 1450 tomas.vondra@postgre     1984         [ -  + ]:            240 :         if (pubrel->columns)
 1450 tomas.vondra@postgre     1985         [ #  # ]:UBC           0 :             ereport(ERROR,
                               1986                 :                :                     errcode(ERRCODE_SYNTAX_ERROR),
                               1987                 :                :                     errmsg("column list must not be specified in ALTER PUBLICATION ... DROP"));
                               1988                 :                : 
 2672 andres@anarazel.de       1989                 :CBC         240 :         prid = GetSysCacheOid2(PUBLICATIONRELMAP, Anum_pg_publication_rel_oid,
                               1990                 :                :                                ObjectIdGetDatum(relid),
                               1991                 :                :                                ObjectIdGetDatum(pubid));
 3342 peter_e@gmx.net          1992         [ +  + ]:            240 :         if (!OidIsValid(prid))
                               1993                 :                :         {
                               1994         [ -  + ]:              6 :             if (missing_ok)
 3342 peter_e@gmx.net          1995                 :UBC           0 :                 continue;
                               1996                 :                : 
 3342 peter_e@gmx.net          1997         [ +  - ]:CBC           6 :             ereport(ERROR,
                               1998                 :                :                     (errcode(ERRCODE_UNDEFINED_OBJECT),
                               1999                 :                :                      errmsg("relation \"%s\" is not part of the publication",
                               2000                 :                :                             RelationGetRelationName(rel))));
                               2001                 :                :         }
                               2002                 :                : 
 1482 akapila@postgresql.o     2003         [ +  + ]:            234 :         if (pubrel->whereClause)
                               2004         [ +  - ]:              3 :             ereport(ERROR,
                               2005                 :                :                     (errcode(ERRCODE_SYNTAX_ERROR),
                               2006                 :                :                      errmsg("cannot use a WHERE clause when removing a table from a publication")));
                               2007                 :                : 
 3342 peter_e@gmx.net          2008                 :            231 :         ObjectAddressSet(obj, PublicationRelRelationId, prid);
                               2009                 :            231 :         performDeletion(&obj, DROP_CASCADE, 0);
                               2010                 :                :     }
                               2011                 :            243 : }
                               2012                 :                : 
                               2013                 :                : /*
                               2014                 :                :  * Add listed schemas to the publication.
                               2015                 :                :  */
                               2016                 :                : static void
 1438 tomas.vondra@postgre     2017                 :            294 : PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists,
                               2018                 :                :                       AlterPublicationStmt *stmt)
                               2019                 :                : {
                               2020                 :                :     ListCell   *lc;
                               2021                 :                : 
 1600 akapila@postgresql.o     2022   [ +  +  +  +  :            419 :     foreach(lc, schemas)
                                              +  + ]
                               2023                 :                :     {
                               2024                 :            131 :         Oid         schemaid = lfirst_oid(lc);
                               2025                 :                :         ObjectAddress obj;
                               2026                 :                : 
 1438 tomas.vondra@postgre     2027                 :            131 :         obj = publication_add_schema(pubid, schemaid, if_not_exists);
 1600 akapila@postgresql.o     2028         [ +  + ]:            125 :         if (stmt)
                               2029                 :                :         {
                               2030                 :             34 :             EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
                               2031                 :                :                                              (Node *) stmt);
                               2032                 :                : 
                               2033         [ -  + ]:             34 :             InvokeObjectPostCreateHook(PublicationNamespaceRelationId,
                               2034                 :                :                                        obj.objectId, 0);
                               2035                 :                :         }
                               2036                 :                :     }
                               2037                 :            288 : }
                               2038                 :                : 
                               2039                 :                : /*
                               2040                 :                :  * Remove listed schemas from the publication.
                               2041                 :                :  */
                               2042                 :                : static void
 1438 tomas.vondra@postgre     2043                 :            221 : PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok)
                               2044                 :                : {
                               2045                 :                :     ObjectAddress obj;
                               2046                 :                :     ListCell   *lc;
                               2047                 :                :     Oid         psid;
                               2048                 :                : 
 1600 akapila@postgresql.o     2049   [ +  +  +  +  :            246 :     foreach(lc, schemas)
                                              +  + ]
                               2050                 :                :     {
                               2051                 :             28 :         Oid         schemaid = lfirst_oid(lc);
                               2052                 :                : 
 1438 tomas.vondra@postgre     2053                 :             28 :         psid = GetSysCacheOid2(PUBLICATIONNAMESPACEMAP,
                               2054                 :                :                                Anum_pg_publication_namespace_oid,
                               2055                 :                :                                ObjectIdGetDatum(schemaid),
                               2056                 :                :                                ObjectIdGetDatum(pubid));
 1600 akapila@postgresql.o     2057         [ +  + ]:             28 :         if (!OidIsValid(psid))
                               2058                 :                :         {
                               2059         [ -  + ]:              3 :             if (missing_ok)
 1600 akapila@postgresql.o     2060                 :UBC           0 :                 continue;
                               2061                 :                : 
 1600 akapila@postgresql.o     2062         [ +  - ]:CBC           3 :             ereport(ERROR,
                               2063                 :                :                     (errcode(ERRCODE_UNDEFINED_OBJECT),
                               2064                 :                :                      errmsg("tables from schema \"%s\" are not part of the publication",
                               2065                 :                :                             get_namespace_name(schemaid))));
                               2066                 :                :         }
                               2067                 :                : 
                               2068                 :             25 :         ObjectAddressSet(obj, PublicationNamespaceRelationId, psid);
                               2069                 :             25 :         performDeletion(&obj, DROP_CASCADE, 0);
                               2070                 :                :     }
                               2071                 :            218 : }
                               2072                 :                : 
                               2073                 :                : /*
                               2074                 :                :  * Internal workhorse for changing a publication owner
                               2075                 :                :  */
                               2076                 :                : static void
 3342 peter_e@gmx.net          2077                 :             18 : AlterPublicationOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
                               2078                 :                : {
                               2079                 :                :     Form_pg_publication form;
                               2080                 :                : 
                               2081                 :             18 :     form = (Form_pg_publication) GETSTRUCT(tup);
                               2082                 :                : 
                               2083         [ +  + ]:             18 :     if (form->pubowner == newOwnerId)
                               2084                 :              6 :         return;
                               2085                 :                : 
 3317                          2086         [ +  + ]:             12 :     if (!superuser())
                               2087                 :                :     {
                               2088                 :                :         AclResult   aclresult;
                               2089                 :                : 
                               2090                 :                :         /* Must be owner */
 1218 peter@eisentraut.org     2091         [ -  + ]:              6 :         if (!object_ownercheck(PublicationRelationId, form->oid, GetUserId()))
 3025 peter_e@gmx.net          2092                 :UBC           0 :             aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_PUBLICATION,
 3317                          2093                 :              0 :                            NameStr(form->pubname));
                               2094                 :                : 
                               2095                 :                :         /* Must be able to become new owner */
 1213 rhaas@postgresql.org     2096                 :CBC           6 :         check_can_set_role(GetUserId(), newOwnerId);
                               2097                 :                : 
                               2098                 :                :         /* New owner must have CREATE privilege on database */
 1218 peter@eisentraut.org     2099                 :              6 :         aclresult = object_aclcheck(DatabaseRelationId, MyDatabaseId, newOwnerId, ACL_CREATE);
 3317 peter_e@gmx.net          2100         [ -  + ]:              6 :         if (aclresult != ACLCHECK_OK)
 3025 peter_e@gmx.net          2101                 :UBC           0 :             aclcheck_error(aclresult, OBJECT_DATABASE,
 3317                          2102                 :              0 :                            get_database_name(MyDatabaseId));
                               2103                 :                : 
  157 akapila@postgresql.o     2104         [ +  + ]:GNC           6 :         if (!superuser_arg(newOwnerId))
                               2105                 :                :         {
                               2106   [ +  -  +  -  :              6 :             if (form->puballtables || form->puballsequences ||
                                              +  - ]
                               2107                 :              3 :                 is_schema_publication(form->oid))
                               2108         [ +  - ]:              3 :                 ereport(ERROR,
                               2109                 :                :                         errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
                               2110                 :                :                         errmsg("permission denied to change owner of publication \"%s\"",
                               2111                 :                :                                NameStr(form->pubname)),
                               2112                 :                :                         errhint("The owner of a FOR ALL TABLES or ALL SEQUENCES or TABLES IN SCHEMA publication must be a superuser."));
                               2113                 :                :         }
                               2114                 :                :     }
                               2115                 :                : 
 3342 peter_e@gmx.net          2116                 :CBC           9 :     form->pubowner = newOwnerId;
 3330 alvherre@alvh.no-ip.     2117                 :              9 :     CatalogTupleUpdate(rel, &tup->t_self, tup);
                               2118                 :                : 
                               2119                 :                :     /* Update owner dependency reference */
 3342 peter_e@gmx.net          2120                 :              9 :     changeDependencyOnOwner(PublicationRelationId,
                               2121                 :                :                             form->oid,
                               2122                 :                :                             newOwnerId);
                               2123                 :                : 
                               2124         [ -  + ]:              9 :     InvokeObjectPostAlterHook(PublicationRelationId,
                               2125                 :                :                               form->oid, 0);
                               2126                 :                : }
                               2127                 :                : 
                               2128                 :                : /*
                               2129                 :                :  * Change publication owner -- by name
                               2130                 :                :  */
                               2131                 :                : ObjectAddress
                               2132                 :             18 : AlterPublicationOwner(const char *name, Oid newOwnerId)
                               2133                 :                : {
                               2134                 :                :     Oid         pubid;
                               2135                 :                :     HeapTuple   tup;
                               2136                 :                :     Relation    rel;
                               2137                 :                :     ObjectAddress address;
                               2138                 :                :     Form_pg_publication pubform;
                               2139                 :                : 
 2610 andres@anarazel.de       2140                 :             18 :     rel = table_open(PublicationRelationId, RowExclusiveLock);
                               2141                 :                : 
 3342 peter_e@gmx.net          2142                 :             18 :     tup = SearchSysCacheCopy1(PUBLICATIONNAME, CStringGetDatum(name));
                               2143                 :                : 
                               2144         [ -  + ]:             18 :     if (!HeapTupleIsValid(tup))
 3342 peter_e@gmx.net          2145         [ #  # ]:UBC           0 :         ereport(ERROR,
                               2146                 :                :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
                               2147                 :                :                  errmsg("publication \"%s\" does not exist", name)));
                               2148                 :                : 
 2672 andres@anarazel.de       2149                 :CBC          18 :     pubform = (Form_pg_publication) GETSTRUCT(tup);
  362 akapila@postgresql.o     2150                 :             18 :     pubid = pubform->oid;
                               2151                 :                : 
 3342 peter_e@gmx.net          2152                 :             18 :     AlterPublicationOwner_internal(rel, tup, newOwnerId);
                               2153                 :                : 
  362 akapila@postgresql.o     2154                 :             15 :     ObjectAddressSet(address, PublicationRelationId, pubid);
                               2155                 :                : 
 3342 peter_e@gmx.net          2156                 :             15 :     heap_freetuple(tup);
                               2157                 :                : 
 2610 andres@anarazel.de       2158                 :             15 :     table_close(rel, RowExclusiveLock);
                               2159                 :                : 
 3342 peter_e@gmx.net          2160                 :             15 :     return address;
                               2161                 :                : }
                               2162                 :                : 
                               2163                 :                : /*
                               2164                 :                :  * Change publication owner -- by OID
                               2165                 :                :  */
                               2166                 :                : void
  362 akapila@postgresql.o     2167                 :UBC           0 : AlterPublicationOwner_oid(Oid pubid, Oid newOwnerId)
                               2168                 :                : {
                               2169                 :                :     HeapTuple   tup;
                               2170                 :                :     Relation    rel;
                               2171                 :                : 
 2610 andres@anarazel.de       2172                 :              0 :     rel = table_open(PublicationRelationId, RowExclusiveLock);
                               2173                 :                : 
  362 akapila@postgresql.o     2174                 :              0 :     tup = SearchSysCacheCopy1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
                               2175                 :                : 
 3342 peter_e@gmx.net          2176         [ #  # ]:              0 :     if (!HeapTupleIsValid(tup))
                               2177         [ #  # ]:              0 :         ereport(ERROR,
                               2178                 :                :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
                               2179                 :                :                  errmsg("publication with OID %u does not exist", pubid)));
                               2180                 :                : 
                               2181                 :              0 :     AlterPublicationOwner_internal(rel, tup, newOwnerId);
                               2182                 :                : 
                               2183                 :              0 :     heap_freetuple(tup);
                               2184                 :                : 
 2610 andres@anarazel.de       2185                 :              0 :     table_close(rel, RowExclusiveLock);
 3342 peter_e@gmx.net          2186                 :              0 : }
                               2187                 :                : 
                               2188                 :                : /*
                               2189                 :                :  * Extract the publish_generated_columns option value from a DefElem. "stored"
                               2190                 :                :  * and "none" values are accepted.
                               2191                 :                :  */
                               2192                 :                : static char
  416 akapila@postgresql.o     2193                 :CBC          38 : defGetGeneratedColsOption(DefElem *def)
                               2194                 :                : {
  222                          2195                 :             38 :     char       *sval = "";
                               2196                 :                : 
                               2197                 :                :     /*
                               2198                 :                :      * A parameter value is required.
                               2199                 :                :      */
                               2200         [ +  + ]:             38 :     if (def->arg)
                               2201                 :                :     {
                               2202                 :             35 :         sval = defGetString(def);
                               2203                 :                : 
                               2204         [ +  + ]:             35 :         if (pg_strcasecmp(sval, "none") == 0)
                               2205                 :             11 :             return PUBLISH_GENCOLS_NONE;
                               2206         [ +  + ]:             24 :         if (pg_strcasecmp(sval, "stored") == 0)
                               2207                 :             21 :             return PUBLISH_GENCOLS_STORED;
                               2208                 :                :     }
                               2209                 :                : 
  416                          2210         [ +  - ]:              6 :     ereport(ERROR,
                               2211                 :                :             errcode(ERRCODE_SYNTAX_ERROR),
                               2212                 :                :             errmsg("invalid value for publication parameter \"%s\": \"%s\"", def->defname, sval),
                               2213                 :                :             errdetail("Valid values are \"%s\" and \"%s\".", "none", "stored"));
                               2214                 :                : 
                               2215                 :                :     return PUBLISH_GENCOLS_NONE;    /* keep compiler quiet */
                               2216                 :                : }
        

Generated by: LCOV version 2.4-beta