Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * publicationcmds.c
4 : : * publication manipulation
5 : : *
6 : : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
7 : : * Portions Copyright (c) 1994, Regents of the University of California
8 : : *
9 : : * IDENTIFICATION
10 : : * src/backend/commands/publicationcmds.c
11 : : *
12 : : *-------------------------------------------------------------------------
13 : : */
14 : :
15 : : #include "postgres.h"
16 : :
17 : : #include "access/htup_details.h"
18 : : #include "access/table.h"
19 : : #include "access/xact.h"
20 : : #include "catalog/catalog.h"
21 : : #include "catalog/indexing.h"
22 : : #include "catalog/namespace.h"
23 : : #include "catalog/objectaccess.h"
24 : : #include "catalog/objectaddress.h"
25 : : #include "catalog/pg_database.h"
26 : : #include "catalog/pg_inherits.h"
27 : : #include "catalog/pg_namespace.h"
28 : : #include "catalog/pg_proc.h"
29 : : #include "catalog/pg_publication.h"
30 : : #include "catalog/pg_publication_namespace.h"
31 : : #include "catalog/pg_publication_rel.h"
32 : : #include "commands/defrem.h"
33 : : #include "commands/event_trigger.h"
34 : : #include "commands/publicationcmds.h"
35 : : #include "miscadmin.h"
36 : : #include "nodes/nodeFuncs.h"
37 : : #include "parser/parse_clause.h"
38 : : #include "parser/parse_collate.h"
39 : : #include "parser/parse_relation.h"
40 : : #include "rewrite/rewriteHandler.h"
41 : : #include "storage/lmgr.h"
42 : : #include "utils/acl.h"
43 : : #include "utils/builtins.h"
44 : : #include "utils/inval.h"
45 : : #include "utils/lsyscache.h"
46 : : #include "utils/rel.h"
47 : : #include "utils/syscache.h"
48 : : #include "utils/varlena.h"
49 : :
50 : :
51 : : /*
52 : : * Information used to validate the columns in the row filter expression. See
53 : : * contain_invalid_rfcolumn_walker for details.
54 : : */
55 : : typedef struct rf_context
56 : : {
57 : : Bitmapset *bms_replident; /* bitset of replica identity columns */
58 : : bool pubviaroot; /* true if we are validating the parent
59 : : * relation's row filter */
60 : : Oid relid; /* relid of the relation */
61 : : Oid parentid; /* relid of the parent relation */
62 : : } rf_context;
63 : :
64 : : static List *OpenTableList(List *tables);
65 : : static void CloseTableList(List *rels);
66 : : static void LockSchemaList(List *schemalist);
67 : : static void PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
68 : : AlterPublicationStmt *stmt);
69 : : static void PublicationDropTables(Oid pubid, List *rels, bool missing_ok);
70 : : static void PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists,
71 : : AlterPublicationStmt *stmt);
72 : : static void PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok);
73 : : static char defGetGeneratedColsOption(DefElem *def);
74 : :
75 : :
76 : : static void
1704 dean.a.rasheed@gmail 77 :CBC 537 : parse_publication_options(ParseState *pstate,
78 : : List *options,
79 : : bool *publish_given,
80 : : PublicationActions *pubactions,
81 : : bool *publish_via_partition_root_given,
82 : : bool *publish_via_partition_root,
83 : : bool *publish_generated_columns_given,
84 : : char *publish_generated_columns)
85 : : {
86 : : ListCell *lc;
87 : :
3229 peter_e@gmx.net 88 : 537 : *publish_given = false;
2167 peter@eisentraut.org 89 : 537 : *publish_via_partition_root_given = false;
493 akapila@postgresql.o 90 : 537 : *publish_generated_columns_given = false;
91 : :
92 : : /* defaults */
2167 peter@eisentraut.org 93 : 537 : pubactions->pubinsert = true;
94 : 537 : pubactions->pubupdate = true;
95 : 537 : pubactions->pubdelete = true;
96 : 537 : pubactions->pubtruncate = true;
97 : 537 : *publish_via_partition_root = false;
416 akapila@postgresql.o 98 : 537 : *publish_generated_columns = PUBLISH_GENCOLS_NONE;
99 : :
100 : : /* Parse options */
3224 bruce@momjian.us 101 [ + + + + : 721 : foreach(lc, options)
+ + ]
102 : : {
3342 peter_e@gmx.net 103 : 202 : DefElem *defel = (DefElem *) lfirst(lc);
104 : :
3229 105 [ + + ]: 202 : if (strcmp(defel->defname, "publish") == 0)
106 : : {
107 : : char *publish;
108 : : List *publish_list;
109 : : ListCell *lc2;
110 : :
111 [ - + ]: 70 : if (*publish_given)
1704 dean.a.rasheed@gmail 112 :UBC 0 : errorConflictingDefElem(defel, pstate);
113 : :
114 : : /*
115 : : * If publish option was given only the explicitly listed actions
116 : : * should be published.
117 : : */
2167 peter@eisentraut.org 118 :CBC 70 : pubactions->pubinsert = false;
119 : 70 : pubactions->pubupdate = false;
120 : 70 : pubactions->pubdelete = false;
121 : 70 : pubactions->pubtruncate = false;
122 : :
3229 peter_e@gmx.net 123 : 70 : *publish_given = true;
124 : :
125 : : /*
126 : : * SplitIdentifierString destructively modifies its input, so make
127 : : * a copy so we don't modify the memory of the executing statement
128 : : */
68 drowley@postgresql.o 129 : 70 : publish = pstrdup(defGetString(defel));
130 : :
3229 peter_e@gmx.net 131 [ - + ]: 70 : if (!SplitIdentifierString(publish, ',', &publish_list))
3342 peter_e@gmx.net 132 [ # # ]:UBC 0 : ereport(ERROR,
133 : : (errcode(ERRCODE_SYNTAX_ERROR),
134 : : errmsg("invalid list syntax in parameter \"%s\"",
135 : : "publish")));
136 : :
137 : : /* Process the option list. */
1257 drowley@postgresql.o 138 [ + + + + :CBC 167 : foreach(lc2, publish_list)
+ + ]
139 : : {
140 : 100 : char *publish_opt = (char *) lfirst(lc2);
141 : :
3229 peter_e@gmx.net 142 [ + + ]: 100 : if (strcmp(publish_opt, "insert") == 0)
2167 peter@eisentraut.org 143 : 62 : pubactions->pubinsert = true;
3229 peter_e@gmx.net 144 [ + + ]: 38 : else if (strcmp(publish_opt, "update") == 0)
2167 peter@eisentraut.org 145 : 15 : pubactions->pubupdate = true;
3229 peter_e@gmx.net 146 [ + + ]: 23 : else if (strcmp(publish_opt, "delete") == 0)
2167 peter@eisentraut.org 147 : 10 : pubactions->pubdelete = true;
2899 peter_e@gmx.net 148 [ + + ]: 13 : else if (strcmp(publish_opt, "truncate") == 0)
2167 peter@eisentraut.org 149 : 10 : pubactions->pubtruncate = true;
150 : : else
3229 peter_e@gmx.net 151 [ + - ]: 3 : ereport(ERROR,
152 : : (errcode(ERRCODE_SYNTAX_ERROR),
153 : : errmsg("unrecognized value for publication option \"%s\": \"%s\"",
154 : : "publish", publish_opt)));
155 : : }
156 : : }
2167 peter@eisentraut.org 157 [ + + ]: 132 : else if (strcmp(defel->defname, "publish_via_partition_root") == 0)
158 : : {
159 [ + + ]: 88 : if (*publish_via_partition_root_given)
1704 dean.a.rasheed@gmail 160 : 3 : errorConflictingDefElem(defel, pstate);
2167 peter@eisentraut.org 161 : 85 : *publish_via_partition_root_given = true;
162 : 85 : *publish_via_partition_root = defGetBoolean(defel);
163 : : }
493 akapila@postgresql.o 164 [ + + ]: 44 : else if (strcmp(defel->defname, "publish_generated_columns") == 0)
165 : : {
166 [ + + ]: 41 : if (*publish_generated_columns_given)
167 : 3 : errorConflictingDefElem(defel, pstate);
168 : 38 : *publish_generated_columns_given = true;
416 169 : 38 : *publish_generated_columns = defGetGeneratedColsOption(defel);
170 : : }
171 : : else
3229 peter_e@gmx.net 172 [ + - ]: 3 : ereport(ERROR,
173 : : (errcode(ERRCODE_SYNTAX_ERROR),
174 : : errmsg("unrecognized publication parameter: \"%s\"", defel->defname)));
175 : : }
3342 176 : 519 : }
177 : :
178 : : /*
179 : : * Convert the PublicationObjSpecType list into schema oid list and
180 : : * PublicationTable list.
181 : : */
182 : : static void
1600 akapila@postgresql.o 183 : 944 : ObjectsInPublicationToOids(List *pubobjspec_list, ParseState *pstate,
184 : : List **rels, List **exceptrels, List **schemas)
185 : : {
186 : : ListCell *cell;
187 : : PublicationObjSpec *pubobj;
188 : :
189 [ + + ]: 944 : if (!pubobjspec_list)
190 : 116 : return;
191 : :
192 [ + - + + : 1760 : foreach(cell, pubobjspec_list)
+ + ]
193 : : {
194 : : Oid schemaid;
195 : : List *search_path;
196 : :
197 : 950 : pubobj = (PublicationObjSpec *) lfirst(cell);
198 : :
199 [ + + + + : 950 : switch (pubobj->pubobjtype)
- ]
200 : : {
11 akapila@postgresql.o 201 :GNC 34 : case PUBLICATIONOBJ_EXCEPT_TABLE:
202 : 34 : pubobj->pubtable->except = true;
203 : 34 : *exceptrels = lappend(*exceptrels, pubobj->pubtable);
204 : 34 : break;
1600 akapila@postgresql.o 205 :CBC 718 : case PUBLICATIONOBJ_TABLE:
11 akapila@postgresql.o 206 :GNC 718 : pubobj->pubtable->except = false;
1438 tomas.vondra@postgre 207 :CBC 718 : *rels = lappend(*rels, pubobj->pubtable);
1600 akapila@postgresql.o 208 : 718 : break;
1536 alvherre@alvh.no-ip. 209 : 186 : case PUBLICATIONOBJ_TABLES_IN_SCHEMA:
1600 akapila@postgresql.o 210 : 186 : schemaid = get_namespace_oid(pubobj->name, false);
211 : :
212 : : /* Filter out duplicates if user specifies "sch1, sch1" */
1438 tomas.vondra@postgre 213 : 171 : *schemas = list_append_unique_oid(*schemas, schemaid);
1600 akapila@postgresql.o 214 : 171 : break;
1536 alvherre@alvh.no-ip. 215 : 12 : case PUBLICATIONOBJ_TABLES_IN_CUR_SCHEMA:
1600 akapila@postgresql.o 216 : 12 : search_path = fetch_search_path(false);
217 [ + + ]: 12 : if (search_path == NIL) /* nothing valid in search_path? */
218 [ + - ]: 3 : ereport(ERROR,
219 : : errcode(ERRCODE_UNDEFINED_SCHEMA),
220 : : errmsg("no schema has been selected for CURRENT_SCHEMA"));
221 : :
222 : 9 : schemaid = linitial_oid(search_path);
223 : 9 : list_free(search_path);
224 : :
225 : : /* Filter out duplicates if user specifies "sch1, sch1" */
1438 tomas.vondra@postgre 226 : 9 : *schemas = list_append_unique_oid(*schemas, schemaid);
1600 akapila@postgresql.o 227 : 9 : break;
1600 akapila@postgresql.o 228 :UBC 0 : default:
229 : : /* shouldn't happen */
230 [ # # ]: 0 : elog(ERROR, "invalid publication object type %d", pubobj->pubobjtype);
231 : : break;
232 : : }
233 : : }
234 : : }
235 : :
236 : : /*
237 : : * Returns true if any of the columns used in the row filter WHERE expression is
238 : : * not part of REPLICA IDENTITY, false otherwise.
239 : : */
240 : : static bool
1482 akapila@postgresql.o 241 :CBC 129 : contain_invalid_rfcolumn_walker(Node *node, rf_context *context)
242 : : {
243 [ - + ]: 129 : if (node == NULL)
1482 akapila@postgresql.o 244 :UBC 0 : return false;
245 : :
1482 akapila@postgresql.o 246 [ + + ]:CBC 129 : if (IsA(node, Var))
247 : : {
248 : 52 : Var *var = (Var *) node;
249 : 52 : AttrNumber attnum = var->varattno;
250 : :
251 : : /*
252 : : * If pubviaroot is true, we are validating the row filter of the
253 : : * parent table, but the bitmap contains the replica identity
254 : : * information of the child table. So, get the column number of the
255 : : * child table as parent and child column order could be different.
256 : : */
257 [ + + ]: 52 : if (context->pubviaroot)
258 : : {
259 : 8 : char *colname = get_attname(context->parentid, attnum, false);
260 : :
261 : 8 : attnum = get_attnum(context->relid, colname);
262 : : }
263 : :
264 [ + + ]: 52 : if (!bms_is_member(attnum - FirstLowInvalidHeapAttributeNumber,
265 : 52 : context->bms_replident))
266 : 30 : return true;
267 : : }
268 : :
269 : 99 : return expression_tree_walker(node, contain_invalid_rfcolumn_walker,
270 : : context);
271 : : }
272 : :
273 : : /*
274 : : * Check if all columns referenced in the filter expression are part of the
275 : : * REPLICA IDENTITY index or not.
276 : : *
277 : : * Returns true if any invalid column is found.
278 : : */
279 : : bool
1450 tomas.vondra@postgre 280 : 364 : pub_rf_contains_invalid_column(Oid pubid, Relation relation, List *ancestors,
281 : : bool pubviaroot)
282 : : {
283 : : HeapTuple rftuple;
1482 akapila@postgresql.o 284 : 364 : Oid relid = RelationGetRelid(relation);
285 : 364 : Oid publish_as_relid = RelationGetRelid(relation);
286 : 364 : bool result = false;
287 : : Datum rfdatum;
288 : : bool rfisnull;
289 : :
290 : : /*
291 : : * FULL means all columns are in the REPLICA IDENTITY, so all columns are
292 : : * allowed in the row filter and we can skip the validation.
293 : : */
294 [ + + ]: 364 : if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
295 : 99 : return false;
296 : :
297 : : /*
298 : : * For a partition, if pubviaroot is true, find the topmost ancestor that
299 : : * is published via this publication as we need to use its row filter
300 : : * expression to filter the partition's changes.
301 : : *
302 : : * Note that even though the row filter used is for an ancestor, the
303 : : * REPLICA IDENTITY used will be for the actual child table.
304 : : */
305 [ + + + + ]: 265 : if (pubviaroot && relation->rd_rel->relispartition)
306 : : {
307 : : publish_as_relid
1460 tomas.vondra@postgre 308 : 54 : = GetTopMostAncestorInPublication(pubid, ancestors, NULL);
309 : :
1482 akapila@postgresql.o 310 [ + + ]: 54 : if (!OidIsValid(publish_as_relid))
311 : 3 : publish_as_relid = relid;
312 : : }
313 : :
314 : 265 : rftuple = SearchSysCache2(PUBLICATIONRELMAP,
315 : : ObjectIdGetDatum(publish_as_relid),
316 : : ObjectIdGetDatum(pubid));
317 : :
318 [ + + ]: 265 : if (!HeapTupleIsValid(rftuple))
319 : 28 : return false;
320 : :
321 : 237 : rfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
322 : : Anum_pg_publication_rel_prqual,
323 : : &rfisnull);
324 : :
325 [ + + ]: 237 : if (!rfisnull)
326 : : {
327 : 51 : rf_context context = {0};
328 : : Node *rfnode;
329 : 51 : Bitmapset *bms = NULL;
330 : :
331 : 51 : context.pubviaroot = pubviaroot;
332 : 51 : context.parentid = publish_as_relid;
333 : 51 : context.relid = relid;
334 : :
335 : : /* Remember columns that are part of the REPLICA IDENTITY */
336 : 51 : bms = RelationGetIndexAttrBitmap(relation,
337 : : INDEX_ATTR_BITMAP_IDENTITY_KEY);
338 : :
339 : 51 : context.bms_replident = bms;
340 : 51 : rfnode = stringToNode(TextDatumGetCString(rfdatum));
341 : 51 : result = contain_invalid_rfcolumn_walker(rfnode, &context);
342 : : }
343 : :
344 : 237 : ReleaseSysCache(rftuple);
345 : :
346 : 237 : return result;
347 : : }
348 : :
349 : : /*
350 : : * Check for invalid columns in the publication table definition.
351 : : *
352 : : * This function evaluates two conditions:
353 : : *
354 : : * 1. Ensures that all columns referenced in the REPLICA IDENTITY are covered
355 : : * by the column list. If any column is missing, *invalid_column_list is set
356 : : * to true.
357 : : * 2. Ensures that all the generated columns referenced in the REPLICA IDENTITY
358 : : * are published, either by being explicitly named in the column list or, if
359 : : * no column list is specified, by setting the option
360 : : * publish_generated_columns to stored. If any unpublished
361 : : * generated column is found, *invalid_gen_col is set to true.
362 : : *
363 : : * Returns true if any of the above conditions are not met.
364 : : */
365 : : bool
466 366 : 472 : pub_contains_invalid_column(Oid pubid, Relation relation, List *ancestors,
367 : : bool pubviaroot, char pubgencols_type,
368 : : bool *invalid_column_list,
369 : : bool *invalid_gen_col)
370 : : {
1450 tomas.vondra@postgre 371 : 472 : Oid relid = RelationGetRelid(relation);
372 : 472 : Oid publish_as_relid = RelationGetRelid(relation);
373 : : Bitmapset *idattrs;
466 akapila@postgresql.o 374 : 472 : Bitmapset *columns = NULL;
375 : 472 : TupleDesc desc = RelationGetDescr(relation);
376 : : Publication *pub;
377 : : int x;
378 : :
379 : 472 : *invalid_column_list = false;
380 : 472 : *invalid_gen_col = false;
381 : :
382 : : /*
383 : : * For a partition, if pubviaroot is true, find the topmost ancestor that
384 : : * is published via this publication as we need to use its column list for
385 : : * the changes.
386 : : *
387 : : * Note that even though the column list used is for an ancestor, the
388 : : * REPLICA IDENTITY used will be for the actual child table.
389 : : */
1450 tomas.vondra@postgre 390 [ + + + + ]: 472 : if (pubviaroot && relation->rd_rel->relispartition)
391 : : {
392 : 80 : publish_as_relid = GetTopMostAncestorInPublication(pubid, ancestors, NULL);
393 : :
394 [ + + ]: 80 : if (!OidIsValid(publish_as_relid))
395 : 18 : publish_as_relid = relid;
396 : : }
397 : :
398 : : /* Fetch the column list */
466 akapila@postgresql.o 399 : 472 : pub = GetPublication(pubid);
400 : 472 : check_and_fetch_column_list(pub, publish_as_relid, NULL, &columns);
401 : :
402 [ + + ]: 472 : if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
403 : : {
404 : : /* With REPLICA IDENTITY FULL, no column list is allowed. */
405 : 106 : *invalid_column_list = (columns != NULL);
406 : :
407 : : /*
408 : : * As we don't allow a column list with REPLICA IDENTITY FULL, the
409 : : * publish_generated_columns option must be set to stored if the table
410 : : * has any stored generated columns.
411 : : */
416 412 [ + + ]: 106 : if (pubgencols_type != PUBLISH_GENCOLS_STORED &&
466 413 [ + + ]: 100 : relation->rd_att->constr &&
414 [ + + ]: 44 : relation->rd_att->constr->has_generated_stored)
415 : 3 : *invalid_gen_col = true;
416 : :
417 : : /*
418 : : * Virtual generated columns are currently not supported for logical
419 : : * replication at all.
420 : : */
401 peter@eisentraut.org 421 [ + + ]: 106 : if (relation->rd_att->constr &&
422 [ + + ]: 50 : relation->rd_att->constr->has_generated_virtual)
423 : 6 : *invalid_gen_col = true;
424 : :
466 akapila@postgresql.o 425 [ + + - + ]: 106 : if (*invalid_gen_col && *invalid_column_list)
466 akapila@postgresql.o 426 :UBC 0 : return true;
427 : : }
428 : :
429 : : /* Remember columns that are part of the REPLICA IDENTITY */
466 akapila@postgresql.o 430 :CBC 472 : idattrs = RelationGetIndexAttrBitmap(relation,
431 : : INDEX_ATTR_BITMAP_IDENTITY_KEY);
432 : :
433 : : /*
434 : : * Attnums in the bitmap returned by RelationGetIndexAttrBitmap are offset
435 : : * (to handle system columns the usual way), while column list does not
436 : : * use offset, so we can't do bms_is_subset(). Instead, we have to loop
437 : : * over the idattrs and check all of them are in the list.
438 : : */
439 : 472 : x = -1;
440 [ + + ]: 801 : while ((x = bms_next_member(idattrs, x)) >= 0)
441 : : {
442 : 332 : AttrNumber attnum = (x + FirstLowInvalidHeapAttributeNumber);
443 : 332 : Form_pg_attribute att = TupleDescAttr(desc, attnum - 1);
444 : :
445 [ + + ]: 332 : if (columns == NULL)
446 : : {
447 : : /*
448 : : * The publish_generated_columns option must be set to stored if
449 : : * the REPLICA IDENTITY contains any stored generated column.
450 : : */
401 peter@eisentraut.org 451 [ + + + - ]: 227 : if (att->attgenerated == ATTRIBUTE_GENERATED_STORED && pubgencols_type != PUBLISH_GENCOLS_STORED)
452 : : {
453 : 3 : *invalid_gen_col = true;
454 : 3 : break;
455 : : }
456 : :
457 : : /*
458 : : * The equivalent setting for virtual generated columns does not
459 : : * exist yet.
460 : : */
461 [ - + ]: 224 : if (att->attgenerated == ATTRIBUTE_GENERATED_VIRTUAL)
462 : : {
466 akapila@postgresql.o 463 :UBC 0 : *invalid_gen_col = true;
464 : 0 : break;
465 : : }
466 : :
467 : : /* Skip validating the column list since it is not defined */
466 akapila@postgresql.o 468 :CBC 224 : continue;
469 : : }
470 : :
471 : : /*
472 : : * If pubviaroot is true, we are validating the column list of the
473 : : * parent table, but the bitmap contains the replica identity
474 : : * information of the child table. The parent/child attnums may not
475 : : * match, so translate them to the parent - get the attname from the
476 : : * child, and look it up in the parent.
477 : : */
478 [ + + ]: 105 : if (pubviaroot)
479 : : {
480 : : /* attribute name in the child table */
481 : 40 : char *colname = get_attname(relid, attnum, false);
482 : :
483 : : /*
484 : : * Determine the attnum for the attribute name in parent (we are
485 : : * using the column list defined on the parent).
486 : : */
487 : 40 : attnum = get_attnum(publish_as_relid, colname);
488 : : }
489 : :
490 : : /* replica identity column, not covered by the column list */
491 : 105 : *invalid_column_list |= !bms_is_member(attnum, columns);
492 : :
493 [ + + - + ]: 105 : if (*invalid_column_list && *invalid_gen_col)
466 akapila@postgresql.o 494 :UBC 0 : break;
495 : : }
496 : :
466 akapila@postgresql.o 497 :CBC 472 : bms_free(columns);
498 : 472 : bms_free(idattrs);
499 : :
500 [ + + + + ]: 472 : return *invalid_column_list || *invalid_gen_col;
501 : : }
502 : :
503 : : /*
504 : : * Invalidate entries in the RelationSyncCache for relations included in the
505 : : * specified publication, either via FOR TABLE or FOR TABLES IN SCHEMA.
506 : : *
507 : : * If 'puballtables' is true, invalidate all cache entries.
508 : : */
509 : : void
367 510 : 18 : InvalidatePubRelSyncCache(Oid pubid, bool puballtables)
511 : : {
512 [ + + ]: 18 : if (puballtables)
513 : : {
514 : 3 : CacheInvalidateRelSyncAll();
515 : : }
516 : : else
517 : : {
518 : 15 : List *relids = NIL;
519 : 15 : List *schemarelids = NIL;
520 : :
521 : : /*
522 : : * For partitioned tables, we must invalidate all partitions and
523 : : * itself. WAL records for INSERT/UPDATE/DELETE specify leaf tables as
524 : : * a target. However, WAL records for TRUNCATE specify both a root and
525 : : * its leaves.
526 : : */
11 akapila@postgresql.o 527 :GNC 15 : relids = GetIncludedPublicationRelations(pubid,
528 : : PUBLICATION_PART_ALL);
367 akapila@postgresql.o 529 :CBC 15 : schemarelids = GetAllSchemaPublicationRelations(pubid,
530 : : PUBLICATION_PART_ALL);
531 : :
532 : 15 : relids = list_concat_unique_oid(relids, schemarelids);
533 : :
534 : : /* Invalidate the relsyncache */
535 [ + + + + : 33 : foreach_oid(relid, relids)
+ + ]
536 : 3 : CacheInvalidateRelSync(relid);
537 : : }
538 : :
539 : 18 : return;
540 : : }
541 : :
542 : : /* check_functions_in_node callback */
543 : : static bool
1482 544 : 218 : contain_mutable_or_user_functions_checker(Oid func_id, void *context)
545 : : {
546 [ + + + + ]: 218 : return (func_volatile(func_id) != PROVOLATILE_IMMUTABLE ||
547 : : func_id >= FirstNormalObjectId);
548 : : }
549 : :
550 : : /*
551 : : * The row filter walker checks if the row filter expression is a "simple
552 : : * expression".
553 : : *
554 : : * It allows only simple or compound expressions such as:
555 : : * - (Var Op Const)
556 : : * - (Var Op Var)
557 : : * - (Var Op Const) AND/OR (Var Op Const)
558 : : * - etc
559 : : * (where Var is a column of the table this filter belongs to)
560 : : *
561 : : * The simple expression has the following restrictions:
562 : : * - User-defined operators are not allowed;
563 : : * - User-defined functions are not allowed;
564 : : * - User-defined types are not allowed;
565 : : * - User-defined collations are not allowed;
566 : : * - Non-immutable built-in functions are not allowed;
567 : : * - System columns are not allowed.
568 : : *
569 : : * NOTES
570 : : *
571 : : * We don't allow user-defined functions/operators/types/collations because
572 : : * (a) if a user drops a user-defined object used in a row filter expression or
573 : : * if there is any other error while using it, the logical decoding
574 : : * infrastructure won't be able to recover from such an error even if the
575 : : * object is recreated again because a historic snapshot is used to evaluate
576 : : * the row filter;
577 : : * (b) a user-defined function can be used to access tables that could have
578 : : * unpleasant results because a historic snapshot is used. That's why only
579 : : * immutable built-in functions are allowed in row filter expressions.
580 : : *
581 : : * We don't allow system columns because currently, we don't have that
582 : : * information in the tuple passed to downstream. Also, as we don't replicate
583 : : * those to subscribers, there doesn't seem to be a need for a filter on those
584 : : * columns.
585 : : *
586 : : * We can allow other node types after more analysis and testing.
587 : : */
588 : : static bool
589 : 720 : check_simple_rowfilter_expr_walker(Node *node, ParseState *pstate)
590 : : {
591 : 720 : char *errdetail_msg = NULL;
592 : :
593 [ + + ]: 720 : if (node == NULL)
594 : 3 : return false;
595 : :
596 [ + + + + : 717 : switch (nodeTag(node))
+ + ]
597 : : {
598 : 194 : case T_Var:
599 : : /* System columns are not allowed. */
600 [ + + ]: 194 : if (((Var *) node)->varattno < InvalidAttrNumber)
601 : 3 : errdetail_msg = _("System columns are not allowed.");
602 : 194 : break;
603 : 196 : case T_OpExpr:
604 : : case T_DistinctExpr:
605 : : case T_NullIfExpr:
606 : : /* OK, except user-defined operators are not allowed. */
607 [ + + ]: 196 : if (((OpExpr *) node)->opno >= FirstNormalObjectId)
608 : 3 : errdetail_msg = _("User-defined operators are not allowed.");
609 : 196 : break;
610 : 3 : case T_ScalarArrayOpExpr:
611 : : /* OK, except user-defined operators are not allowed. */
612 [ - + ]: 3 : if (((ScalarArrayOpExpr *) node)->opno >= FirstNormalObjectId)
1482 akapila@postgresql.o 613 :UBC 0 : errdetail_msg = _("User-defined operators are not allowed.");
614 : :
615 : : /*
616 : : * We don't need to check the hashfuncid and negfuncid of
617 : : * ScalarArrayOpExpr as those functions are only built for a
618 : : * subquery.
619 : : */
1482 akapila@postgresql.o 620 :CBC 3 : break;
621 : 3 : case T_RowCompareExpr:
622 : : {
623 : : ListCell *opid;
624 : :
625 : : /* OK, except user-defined operators are not allowed. */
626 [ + - + + : 9 : foreach(opid, ((RowCompareExpr *) node)->opnos)
+ + ]
627 : : {
628 [ - + ]: 6 : if (lfirst_oid(opid) >= FirstNormalObjectId)
629 : : {
1482 akapila@postgresql.o 630 :UBC 0 : errdetail_msg = _("User-defined operators are not allowed.");
631 : 0 : break;
632 : : }
633 : : }
634 : : }
1482 akapila@postgresql.o 635 :CBC 3 : break;
636 : 318 : case T_Const:
637 : : case T_FuncExpr:
638 : : case T_BoolExpr:
639 : : case T_RelabelType:
640 : : case T_CollateExpr:
641 : : case T_CaseExpr:
642 : : case T_CaseTestExpr:
643 : : case T_ArrayExpr:
644 : : case T_RowExpr:
645 : : case T_CoalesceExpr:
646 : : case T_MinMaxExpr:
647 : : case T_XmlExpr:
648 : : case T_NullTest:
649 : : case T_BooleanTest:
650 : : case T_List:
651 : : /* OK, supported */
652 : 318 : break;
653 : 3 : default:
1268 peter@eisentraut.org 654 : 3 : errdetail_msg = _("Only columns, constants, built-in operators, built-in data types, built-in collations, and immutable built-in functions are allowed.");
1482 akapila@postgresql.o 655 : 3 : break;
656 : : }
657 : :
658 : : /*
659 : : * For all the supported nodes, if we haven't already found a problem,
660 : : * check the types, functions, and collations used in it. We check List
661 : : * by walking through each element.
662 : : */
1264 alvherre@alvh.no-ip. 663 [ + + + + ]: 717 : if (!errdetail_msg && !IsA(node, List))
664 : : {
665 [ + + ]: 681 : if (exprType(node) >= FirstNormalObjectId)
666 : 3 : errdetail_msg = _("User-defined types are not allowed.");
667 [ + + ]: 678 : else if (check_functions_in_node(node, contain_mutable_or_user_functions_checker,
668 : : pstate))
669 : 6 : errdetail_msg = _("User-defined or built-in mutable functions are not allowed.");
670 [ + - + + ]: 1344 : else if (exprCollation(node) >= FirstNormalObjectId ||
671 : 672 : exprInputCollation(node) >= FirstNormalObjectId)
672 : 3 : errdetail_msg = _("User-defined collations are not allowed.");
673 : : }
674 : :
675 : : /*
676 : : * If we found a problem in this node, throw error now. Otherwise keep
677 : : * going.
678 : : */
1482 akapila@postgresql.o 679 [ + + ]: 717 : if (errdetail_msg)
680 [ + - ]: 21 : ereport(ERROR,
681 : : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
682 : : errmsg("invalid publication WHERE expression"),
683 : : errdetail_internal("%s", errdetail_msg),
684 : : parser_errposition(pstate, exprLocation(node))));
685 : :
686 : 696 : return expression_tree_walker(node, check_simple_rowfilter_expr_walker,
687 : : pstate);
688 : : }
689 : :
690 : : /*
691 : : * Check if the row filter expression is a "simple expression".
692 : : *
693 : : * See check_simple_rowfilter_expr_walker for details.
694 : : */
695 : : static bool
696 : 188 : check_simple_rowfilter_expr(Node *node, ParseState *pstate)
697 : : {
698 : 188 : return check_simple_rowfilter_expr_walker(node, pstate);
699 : : }
700 : :
701 : : /*
702 : : * Transform the publication WHERE expression for all the relations in the list,
703 : : * ensuring it is coerced to boolean and necessary collation information is
704 : : * added if required, and add a new nsitem/RTE for the associated relation to
705 : : * the ParseState's namespace list.
706 : : *
707 : : * Also check the publication row filter expression and throw an error if
708 : : * anything not permitted or unexpected is encountered.
709 : : */
710 : : static void
711 : 595 : TransformPubWhereClauses(List *tables, const char *queryString,
712 : : bool pubviaroot)
713 : : {
714 : : ListCell *lc;
715 : :
716 [ + + + + : 1198 : foreach(lc, tables)
+ + ]
717 : : {
718 : : ParseNamespaceItem *nsitem;
719 : 633 : Node *whereclause = NULL;
720 : : ParseState *pstate;
721 : 633 : PublicationRelInfo *pri = (PublicationRelInfo *) lfirst(lc);
722 : :
723 [ + + ]: 633 : if (pri->whereClause == NULL)
724 : 436 : continue;
725 : :
726 : : /*
727 : : * If the publication doesn't publish changes via the root partitioned
728 : : * table, the partition's row filter will be used. So disallow using
729 : : * WHERE clause on partitioned table in this case.
730 : : */
731 [ + + ]: 197 : if (!pubviaroot &&
732 [ + + ]: 182 : pri->relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
733 [ + - ]: 3 : ereport(ERROR,
734 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
735 : : errmsg("cannot use publication WHERE clause for relation \"%s\"",
736 : : RelationGetRelationName(pri->relation)),
737 : : errdetail("WHERE clause cannot be used for a partitioned table when %s is false.",
738 : : "publish_via_partition_root")));
739 : :
740 : : /*
741 : : * A fresh pstate is required so that we only have "this" table in its
742 : : * rangetable
743 : : */
744 : 194 : pstate = make_parsestate(NULL);
745 : 194 : pstate->p_sourcetext = queryString;
746 : 194 : nsitem = addRangeTableEntryForRelation(pstate, pri->relation,
747 : : AccessShareLock, NULL,
748 : : false, false);
749 : 194 : addNSItemToQuery(pstate, nsitem, false, true, true);
750 : :
751 : 194 : whereclause = transformWhereClause(pstate,
752 : 194 : copyObject(pri->whereClause),
753 : : EXPR_KIND_WHERE,
754 : : "PUBLICATION WHERE");
755 : :
756 : : /* Fix up collation information */
757 : 188 : assign_expr_collations(pstate, whereclause);
758 : :
401 peter@eisentraut.org 759 : 188 : whereclause = expand_generated_columns_in_expr(whereclause, pri->relation, 1);
760 : :
761 : : /*
762 : : * We allow only simple expressions in row filters. See
763 : : * check_simple_rowfilter_expr_walker.
764 : : */
1482 akapila@postgresql.o 765 : 188 : check_simple_rowfilter_expr(whereclause, pstate);
766 : :
767 : 167 : free_parsestate(pstate);
768 : :
769 : 167 : pri->whereClause = whereclause;
770 : : }
771 : 565 : }
772 : :
773 : :
774 : : /*
775 : : * Given a list of tables that are going to be added to a publication,
776 : : * verify that they fulfill the necessary preconditions, namely: no tables
777 : : * have a column list if any schema is published; and partitioned tables do
778 : : * not have column lists if publish_via_partition_root is not set.
779 : : *
780 : : * 'publish_schema' indicates that the publication contains any TABLES IN
781 : : * SCHEMA elements (newly added in this command, or preexisting).
782 : : * 'pubviaroot' is the value of publish_via_partition_root.
783 : : */
784 : : static void
1265 alvherre@alvh.no-ip. 785 : 565 : CheckPubRelationColumnList(char *pubname, List *tables,
786 : : bool publish_schema, bool pubviaroot)
787 : : {
788 : : ListCell *lc;
789 : :
1450 tomas.vondra@postgre 790 [ + + + + : 1153 : foreach(lc, tables)
+ + ]
791 : : {
792 : 603 : PublicationRelInfo *pri = (PublicationRelInfo *) lfirst(lc);
793 : :
794 [ + + ]: 603 : if (pri->columns == NIL)
795 : 401 : continue;
796 : :
797 : : /*
798 : : * Disallow specifying column list if any schema is in the
799 : : * publication.
800 : : *
801 : : * XXX We could instead just forbid the case when the publication
802 : : * tries to publish the table with a column list and a schema for that
803 : : * table. However, if we do that then we need a restriction during
804 : : * ALTER TABLE ... SET SCHEMA to prevent such a case which doesn't
805 : : * seem to be a good idea.
806 : : */
1269 akapila@postgresql.o 807 [ + + ]: 202 : if (publish_schema)
808 [ + - ]: 12 : ereport(ERROR,
809 : : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
810 : : errmsg("cannot use column list for relation \"%s.%s\" in publication \"%s\"",
811 : : get_namespace_name(RelationGetNamespace(pri->relation)),
812 : : RelationGetRelationName(pri->relation), pubname),
813 : : errdetail("Column lists cannot be specified in publications containing FOR TABLES IN SCHEMA elements."));
814 : :
815 : : /*
816 : : * If the publication doesn't publish changes via the root partitioned
817 : : * table, the partition's column list will be used. So disallow using
818 : : * a column list on the partitioned table in this case.
819 : : */
1450 tomas.vondra@postgre 820 [ + + ]: 190 : if (!pubviaroot &&
821 [ + + ]: 152 : pri->relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
822 [ + - ]: 3 : ereport(ERROR,
823 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
824 : : errmsg("cannot use column list for relation \"%s.%s\" in publication \"%s\"",
825 : : get_namespace_name(RelationGetNamespace(pri->relation)),
826 : : RelationGetRelationName(pri->relation), pubname),
827 : : errdetail("Column lists cannot be specified for partitioned tables when %s is false.",
828 : : "publish_via_partition_root")));
829 : : }
830 : 550 : }
831 : :
832 : : /*
833 : : * Create new publication.
834 : : */
835 : : ObjectAddress
1704 dean.a.rasheed@gmail 836 : 479 : CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt)
837 : : {
838 : : Relation rel;
839 : : ObjectAddress myself;
840 : : Oid puboid;
841 : : bool nulls[Natts_pg_publication];
842 : : Datum values[Natts_pg_publication];
843 : : HeapTuple tup;
844 : : bool publish_given;
845 : : PublicationActions pubactions;
846 : : bool publish_via_partition_root_given;
847 : : bool publish_via_partition_root;
848 : : bool publish_generated_columns_given;
849 : : char publish_generated_columns;
850 : : AclResult aclresult;
1438 tomas.vondra@postgre 851 : 479 : List *relations = NIL;
11 akapila@postgresql.o 852 :GNC 479 : List *exceptrelations = NIL;
1438 tomas.vondra@postgre 853 :CBC 479 : List *schemaidlist = NIL;
854 : :
855 : : /* must have CREATE privilege on database */
1218 peter@eisentraut.org 856 : 479 : aclresult = object_aclcheck(DatabaseRelationId, MyDatabaseId, GetUserId(), ACL_CREATE);
3342 peter_e@gmx.net 857 [ + + ]: 479 : if (aclresult != ACLCHECK_OK)
3025 858 : 3 : aclcheck_error(aclresult, OBJECT_DATABASE,
3342 859 : 3 : get_database_name(MyDatabaseId));
860 : :
861 : : /* FOR ALL TABLES and FOR ALL SEQUENCES requires superuser */
157 akapila@postgresql.o 862 [ + + ]:GNC 476 : if (!superuser())
863 : : {
864 [ + - - + ]: 12 : if (stmt->for_all_tables || stmt->for_all_sequences)
157 akapila@postgresql.o 865 [ # # ]:UNC 0 : ereport(ERROR,
866 : : errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
867 : : errmsg("must be superuser to create a FOR ALL TABLES or ALL SEQUENCES publication"));
868 : : }
869 : :
2610 andres@anarazel.de 870 :CBC 476 : rel = table_open(PublicationRelationId, RowExclusiveLock);
871 : :
872 : : /* Check if name is used */
2672 873 : 476 : puboid = GetSysCacheOid1(PUBLICATIONNAME, Anum_pg_publication_oid,
874 : : CStringGetDatum(stmt->pubname));
3342 peter_e@gmx.net 875 [ + + ]: 476 : if (OidIsValid(puboid))
876 [ + - ]: 3 : ereport(ERROR,
877 : : (errcode(ERRCODE_DUPLICATE_OBJECT),
878 : : errmsg("publication \"%s\" already exists",
879 : : stmt->pubname)));
880 : :
881 : : /* Form a tuple. */
882 : 473 : memset(values, 0, sizeof(values));
883 : 473 : memset(nulls, false, sizeof(nulls));
884 : :
885 : 473 : values[Anum_pg_publication_pubname - 1] =
886 : 473 : DirectFunctionCall1(namein, CStringGetDatum(stmt->pubname));
887 : 473 : values[Anum_pg_publication_pubowner - 1] = ObjectIdGetDatum(GetUserId());
888 : :
1704 dean.a.rasheed@gmail 889 : 473 : parse_publication_options(pstate,
890 : : stmt->options,
891 : : &publish_given, &pubactions,
892 : : &publish_via_partition_root_given,
893 : : &publish_via_partition_root,
894 : : &publish_generated_columns_given,
895 : : &publish_generated_columns);
896 : :
157 akapila@postgresql.o 897 [ + + ]:GNC 455 : if (stmt->for_all_sequences &&
898 [ + + + - : 14 : (publish_given || publish_via_partition_root_given ||
- + ]
899 : : publish_generated_columns_given))
900 [ + - ]: 1 : ereport(NOTICE,
901 : : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
902 : : errmsg("publication parameters are not applicable to sequence synchronization and will be ignored for sequences"));
903 : :
2672 andres@anarazel.de 904 :CBC 455 : puboid = GetNewOidWithIndex(rel, PublicationObjectIndexId,
905 : : Anum_pg_publication_oid);
906 : 455 : values[Anum_pg_publication_oid - 1] = ObjectIdGetDatum(puboid);
3342 peter_e@gmx.net 907 : 455 : values[Anum_pg_publication_puballtables - 1] =
1438 tomas.vondra@postgre 908 : 455 : BoolGetDatum(stmt->for_all_tables);
157 akapila@postgresql.o 909 :GNC 455 : values[Anum_pg_publication_puballsequences - 1] =
910 : 455 : BoolGetDatum(stmt->for_all_sequences);
3342 peter_e@gmx.net 911 :CBC 455 : values[Anum_pg_publication_pubinsert - 1] =
2167 peter@eisentraut.org 912 : 455 : BoolGetDatum(pubactions.pubinsert);
3342 peter_e@gmx.net 913 : 455 : values[Anum_pg_publication_pubupdate - 1] =
2167 peter@eisentraut.org 914 : 455 : BoolGetDatum(pubactions.pubupdate);
3342 peter_e@gmx.net 915 : 455 : values[Anum_pg_publication_pubdelete - 1] =
2167 peter@eisentraut.org 916 : 455 : BoolGetDatum(pubactions.pubdelete);
2899 peter_e@gmx.net 917 : 455 : values[Anum_pg_publication_pubtruncate - 1] =
2167 peter@eisentraut.org 918 : 455 : BoolGetDatum(pubactions.pubtruncate);
919 : 455 : values[Anum_pg_publication_pubviaroot - 1] =
920 : 455 : BoolGetDatum(publish_via_partition_root);
411 akapila@postgresql.o 921 : 455 : values[Anum_pg_publication_pubgencols - 1] =
416 922 : 455 : CharGetDatum(publish_generated_columns);
923 : :
3342 peter_e@gmx.net 924 : 455 : tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
925 : :
926 : : /* Insert tuple into catalog. */
2672 andres@anarazel.de 927 : 455 : CatalogTupleInsert(rel, tup);
3342 peter_e@gmx.net 928 : 455 : heap_freetuple(tup);
929 : :
3341 alvherre@alvh.no-ip. 930 : 455 : recordDependencyOnOwner(PublicationRelationId, puboid, GetUserId());
931 : :
3342 peter_e@gmx.net 932 : 455 : ObjectAddressSet(myself, PublicationRelationId, puboid);
933 : :
934 : : /* Make the changes visible. */
935 : 455 : CommandCounterIncrement();
936 : :
937 : : /* Associate objects with the publication. */
11 akapila@postgresql.o 938 :GNC 455 : ObjectsInPublicationToOids(stmt->pubobjects, pstate, &relations,
939 : : &exceptrelations, &schemaidlist);
940 : :
1438 tomas.vondra@postgre 941 [ + + ]:CBC 446 : if (stmt->for_all_tables)
942 : : {
943 : : /* Process EXCEPT table list */
11 akapila@postgresql.o 944 [ + + ]:GNC 82 : if (exceptrelations != NIL)
945 : : {
946 : : List *rels;
947 : :
948 : 28 : rels = OpenTableList(exceptrelations);
949 : 28 : PublicationAddTables(puboid, rels, true, NULL);
950 : 25 : CloseTableList(rels);
951 : : }
952 : :
953 : : /*
954 : : * Invalidate relcache so that publication info is rebuilt. Sequences
955 : : * publication doesn't require invalidation, as replica identity
956 : : * checks don't apply to them.
957 : : */
1649 akapila@postgresql.o 958 :CBC 79 : CacheInvalidateRelcacheAll();
959 : : }
157 akapila@postgresql.o 960 [ + + ]:GNC 364 : else if (!stmt->for_all_sequences)
961 : : {
962 : : /* FOR TABLES IN SCHEMA requires superuser */
1306 tgl@sss.pgh.pa.us 963 [ + + + + ]:CBC 354 : if (schemaidlist != NIL && !superuser())
1600 akapila@postgresql.o 964 [ + - ]: 3 : ereport(ERROR,
965 : : errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
966 : : errmsg("must be superuser to create FOR TABLES IN SCHEMA publication"));
967 : :
1306 tgl@sss.pgh.pa.us 968 [ + + ]: 351 : if (relations != NIL)
969 : : {
970 : : List *rels;
971 : :
1438 tomas.vondra@postgre 972 : 239 : rels = OpenTableList(relations);
1482 akapila@postgresql.o 973 : 224 : TransformPubWhereClauses(rels, pstate->p_sourcetext,
974 : : publish_via_partition_root);
975 : :
1265 alvherre@alvh.no-ip. 976 : 212 : CheckPubRelationColumnList(stmt->pubname, rels,
977 : : schemaidlist != NIL,
978 : : publish_via_partition_root);
979 : :
1438 tomas.vondra@postgre 980 : 209 : PublicationAddTables(puboid, rels, true, NULL);
981 : 196 : CloseTableList(rels);
982 : : }
983 : :
1306 tgl@sss.pgh.pa.us 984 [ + + ]: 308 : if (schemaidlist != NIL)
985 : : {
986 : : /*
987 : : * Schema lock is held until the publication is created to prevent
988 : : * concurrent schema deletion.
989 : : */
1438 tomas.vondra@postgre 990 : 76 : LockSchemaList(schemaidlist);
991 : 76 : PublicationAddSchemas(puboid, schemaidlist, true, NULL);
992 : : }
993 : : }
994 : :
2610 andres@anarazel.de 995 : 394 : table_close(rel, RowExclusiveLock);
996 : :
3342 peter_e@gmx.net 997 [ - + ]: 394 : InvokeObjectPostCreateHook(PublicationRelationId, puboid, 0);
998 : :
999 : : /*
1000 : : * We don't need this warning message when wal_level >= 'replica' since
1001 : : * logical decoding is automatically enabled up on a logical slot
1002 : : * creation.
1003 : : */
82 msawada@postgresql.o 1004 [ + + ]:GNC 394 : if (wal_level < WAL_LEVEL_REPLICA)
2437 tmunro@postgresql.or 1005 [ + - ]:CBC 16 : ereport(WARNING,
1006 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1007 : : errmsg("logical decoding must be enabled to publish logical changes"),
1008 : : errhint("Before creating subscriptions, ensure that \"wal_level\" is set to \"replica\" or higher.")));
1009 : :
3342 peter_e@gmx.net 1010 : 394 : return myself;
1011 : : }
1012 : :
1013 : : /*
1014 : : * Change options of a publication.
1015 : : */
1016 : : static void
1704 dean.a.rasheed@gmail 1017 : 64 : AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt,
1018 : : Relation rel, HeapTuple tup)
1019 : : {
1020 : : bool nulls[Natts_pg_publication];
1021 : : bool replaces[Natts_pg_publication];
1022 : : Datum values[Natts_pg_publication];
1023 : : bool publish_given;
1024 : : PublicationActions pubactions;
1025 : : bool publish_via_partition_root_given;
1026 : : bool publish_via_partition_root;
1027 : : bool publish_generated_columns_given;
1028 : : char publish_generated_columns;
1029 : : ObjectAddress obj;
1030 : : Form_pg_publication pubform;
1482 akapila@postgresql.o 1031 : 64 : List *root_relids = NIL;
1032 : : ListCell *lc;
1033 : :
157 akapila@postgresql.o 1034 :GNC 64 : pubform = (Form_pg_publication) GETSTRUCT(tup);
1035 : :
1704 dean.a.rasheed@gmail 1036 :CBC 64 : parse_publication_options(pstate,
1037 : : stmt->options,
1038 : : &publish_given, &pubactions,
1039 : : &publish_via_partition_root_given,
1040 : : &publish_via_partition_root,
1041 : : &publish_generated_columns_given,
1042 : : &publish_generated_columns);
1043 : :
157 akapila@postgresql.o 1044 [ + + ]:GNC 64 : if (pubform->puballsequences &&
1045 [ + + + - : 6 : (publish_given || publish_via_partition_root_given ||
+ - ]
1046 : : publish_generated_columns_given))
1047 [ + - ]: 6 : ereport(NOTICE,
1048 : : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1049 : : errmsg("publication parameters are not applicable to sequence synchronization and will be ignored for sequences"));
1050 : :
1051 : : /*
1052 : : * If the publication doesn't publish changes via the root partitioned
1053 : : * table, the partition's row filter and column list will be used. So
1054 : : * disallow using WHERE clause and column lists on partitioned table in
1055 : : * this case.
1056 : : */
1482 akapila@postgresql.o 1057 [ + + + + ]:CBC 64 : if (!pubform->puballtables && publish_via_partition_root_given &&
1058 [ + + ]: 40 : !publish_via_partition_root)
1059 : : {
1060 : : /*
1061 : : * Lock the publication so nobody else can do anything with it. This
1062 : : * prevents concurrent alter to add partitioned table(s) with WHERE
1063 : : * clause(s) and/or column lists which we don't allow when not
1064 : : * publishing via root.
1065 : : */
1066 : 24 : LockDatabaseObject(PublicationRelationId, pubform->oid, 0,
1067 : : AccessShareLock);
1068 : :
11 akapila@postgresql.o 1069 :GNC 24 : root_relids = GetIncludedPublicationRelations(pubform->oid,
1070 : : PUBLICATION_PART_ROOT);
1071 : :
1482 akapila@postgresql.o 1072 [ + - + + :CBC 42 : foreach(lc, root_relids)
+ + ]
1073 : : {
1074 : 24 : Oid relid = lfirst_oid(lc);
1075 : : HeapTuple rftuple;
1076 : : char relkind;
1077 : : char *relname;
1078 : : bool has_rowfilter;
1079 : : bool has_collist;
1080 : :
1081 : : /*
1082 : : * Beware: we don't have lock on the relations, so cope silently
1083 : : * with the cache lookups returning NULL.
1084 : : */
1085 : :
1086 : 24 : rftuple = SearchSysCache2(PUBLICATIONRELMAP,
1087 : : ObjectIdGetDatum(relid),
1088 : : ObjectIdGetDatum(pubform->oid));
1432 alvherre@alvh.no-ip. 1089 [ - + ]: 24 : if (!HeapTupleIsValid(rftuple))
1432 alvherre@alvh.no-ip. 1090 :UBC 0 : continue;
1432 alvherre@alvh.no-ip. 1091 :CBC 24 : has_rowfilter = !heap_attisnull(rftuple, Anum_pg_publication_rel_prqual, NULL);
1092 : 24 : has_collist = !heap_attisnull(rftuple, Anum_pg_publication_rel_prattrs, NULL);
1093 [ + + + + ]: 24 : if (!has_rowfilter && !has_collist)
1094 : : {
1095 : 6 : ReleaseSysCache(rftuple);
1096 : 6 : continue;
1097 : : }
1098 : :
1099 : 18 : relkind = get_rel_relkind(relid);
1100 [ + + ]: 18 : if (relkind != RELKIND_PARTITIONED_TABLE)
1101 : : {
1102 : 12 : ReleaseSysCache(rftuple);
1103 : 12 : continue;
1104 : : }
1105 : 6 : relname = get_rel_name(relid);
1106 [ - + ]: 6 : if (relname == NULL) /* table concurrently dropped */
1107 : : {
1482 akapila@postgresql.o 1108 :UBC 0 : ReleaseSysCache(rftuple);
1432 alvherre@alvh.no-ip. 1109 : 0 : continue;
1110 : : }
1111 : :
1432 alvherre@alvh.no-ip. 1112 [ + + ]:CBC 6 : if (has_rowfilter)
1113 [ + - ]: 3 : ereport(ERROR,
1114 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1115 : : errmsg("cannot set parameter \"%s\" to false for publication \"%s\"",
1116 : : "publish_via_partition_root",
1117 : : stmt->pubname),
1118 : : errdetail("The publication contains a WHERE clause for partitioned table \"%s\", which is not allowed when \"%s\" is false.",
1119 : : relname, "publish_via_partition_root")));
1120 [ - + ]: 3 : Assert(has_collist);
1121 [ + - ]: 3 : ereport(ERROR,
1122 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1123 : : errmsg("cannot set parameter \"%s\" to false for publication \"%s\"",
1124 : : "publish_via_partition_root",
1125 : : stmt->pubname),
1126 : : errdetail("The publication contains a column list for partitioned table \"%s\", which is not allowed when \"%s\" is false.",
1127 : : relname, "publish_via_partition_root")));
1128 : : }
1129 : : }
1130 : :
1131 : : /* Everything ok, form a new tuple. */
3342 peter_e@gmx.net 1132 : 58 : memset(values, 0, sizeof(values));
1133 : 58 : memset(nulls, false, sizeof(nulls));
1134 : 58 : memset(replaces, false, sizeof(replaces));
1135 : :
3229 1136 [ + + ]: 58 : if (publish_given)
1137 : : {
2167 peter@eisentraut.org 1138 : 17 : values[Anum_pg_publication_pubinsert - 1] = BoolGetDatum(pubactions.pubinsert);
3342 peter_e@gmx.net 1139 : 17 : replaces[Anum_pg_publication_pubinsert - 1] = true;
1140 : :
2167 peter@eisentraut.org 1141 : 17 : values[Anum_pg_publication_pubupdate - 1] = BoolGetDatum(pubactions.pubupdate);
3342 peter_e@gmx.net 1142 : 17 : replaces[Anum_pg_publication_pubupdate - 1] = true;
1143 : :
2167 peter@eisentraut.org 1144 : 17 : values[Anum_pg_publication_pubdelete - 1] = BoolGetDatum(pubactions.pubdelete);
3342 peter_e@gmx.net 1145 : 17 : replaces[Anum_pg_publication_pubdelete - 1] = true;
1146 : :
2167 peter@eisentraut.org 1147 : 17 : values[Anum_pg_publication_pubtruncate - 1] = BoolGetDatum(pubactions.pubtruncate);
2899 peter_e@gmx.net 1148 : 17 : replaces[Anum_pg_publication_pubtruncate - 1] = true;
1149 : : }
1150 : :
2167 peter@eisentraut.org 1151 [ + + ]: 58 : if (publish_via_partition_root_given)
1152 : : {
1153 : 35 : values[Anum_pg_publication_pubviaroot - 1] = BoolGetDatum(publish_via_partition_root);
1154 : 35 : replaces[Anum_pg_publication_pubviaroot - 1] = true;
1155 : : }
1156 : :
493 akapila@postgresql.o 1157 [ + + ]: 58 : if (publish_generated_columns_given)
1158 : : {
411 1159 : 6 : values[Anum_pg_publication_pubgencols - 1] = CharGetDatum(publish_generated_columns);
1160 : 6 : replaces[Anum_pg_publication_pubgencols - 1] = true;
1161 : : }
1162 : :
3342 peter_e@gmx.net 1163 : 58 : tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
1164 : : replaces);
1165 : :
1166 : : /* Update the catalog. */
3330 alvherre@alvh.no-ip. 1167 : 58 : CatalogTupleUpdate(rel, &tup->t_self, tup);
1168 : :
3342 peter_e@gmx.net 1169 : 58 : CommandCounterIncrement();
1170 : :
2672 andres@anarazel.de 1171 : 58 : pubform = (Form_pg_publication) GETSTRUCT(tup);
1172 : :
1173 : : /* Invalidate the relcache. */
1438 tomas.vondra@postgre 1174 [ + + ]: 58 : if (pubform->puballtables)
1175 : : {
3342 peter_e@gmx.net 1176 : 10 : CacheInvalidateRelcacheAll();
1177 : : }
1178 : : else
1179 : : {
1600 akapila@postgresql.o 1180 : 48 : List *relids = NIL;
1181 : 48 : List *schemarelids = NIL;
1182 : :
1183 : : /*
1184 : : * For any partitioned tables contained in the publication, we must
1185 : : * invalidate all partitions contained in the respective partition
1186 : : * trees, not just those explicitly mentioned in the publication.
1187 : : */
1482 1188 [ + + ]: 48 : if (root_relids == NIL)
11 akapila@postgresql.o 1189 :GNC 30 : relids = GetIncludedPublicationRelations(pubform->oid,
1190 : : PUBLICATION_PART_ALL);
1191 : : else
1192 : : {
1193 : : /*
1194 : : * We already got tables explicitly mentioned in the publication.
1195 : : * Now get all partitions for the partitioned table in the list.
1196 : : */
1482 akapila@postgresql.o 1197 [ + - + + :CBC 36 : foreach(lc, root_relids)
+ + ]
1198 : 18 : relids = GetPubPartitionOptionRelations(relids,
1199 : : PUBLICATION_PART_ALL,
1200 : : lfirst_oid(lc));
1201 : : }
1202 : :
1600 1203 : 48 : schemarelids = GetAllSchemaPublicationRelations(pubform->oid,
1204 : : PUBLICATION_PART_ALL);
1205 : 48 : relids = list_concat_unique_oid(relids, schemarelids);
1206 : :
1635 1207 : 48 : InvalidatePublicationRels(relids);
1208 : : }
1209 : :
2672 andres@anarazel.de 1210 : 58 : ObjectAddressSet(obj, PublicationRelationId, pubform->oid);
3342 peter_e@gmx.net 1211 : 58 : EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
1212 : : (Node *) stmt);
1213 : :
2672 andres@anarazel.de 1214 [ - + ]: 58 : InvokeObjectPostAlterHook(PublicationRelationId, pubform->oid, 0);
3342 peter_e@gmx.net 1215 : 58 : }
1216 : :
1217 : : /*
1218 : : * Invalidate the relations.
1219 : : */
1220 : : void
1635 akapila@postgresql.o 1221 : 1262 : InvalidatePublicationRels(List *relids)
1222 : : {
1223 : : /*
1224 : : * We don't want to send too many individual messages, at some point it's
1225 : : * cheaper to just reset whole relcache.
1226 : : */
1227 [ + - ]: 1262 : if (list_length(relids) < MAX_RELCACHE_INVAL_MSGS)
1228 : : {
1229 : : ListCell *lc;
1230 : :
1231 [ + + + + : 10414 : foreach(lc, relids)
+ + ]
1232 : 9152 : CacheInvalidateRelcacheByRelid(lfirst_oid(lc));
1233 : : }
1234 : : else
1635 akapila@postgresql.o 1235 :UBC 0 : CacheInvalidateRelcacheAll();
1635 akapila@postgresql.o 1236 :CBC 1262 : }
1237 : :
1238 : : /*
1239 : : * Add or remove table to/from publication.
1240 : : */
1241 : : static void
1600 1242 : 459 : AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup,
1243 : : List *tables, const char *queryString,
1244 : : bool publish_schema)
1245 : : {
3342 peter_e@gmx.net 1246 : 459 : List *rels = NIL;
1247 : 459 : Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
2672 andres@anarazel.de 1248 : 459 : Oid pubid = pubform->oid;
1249 : :
1250 : : /*
1251 : : * Nothing to do if no objects, except in SET: for that it is quite
1252 : : * possible that user has not specified any tables in which case we need
1253 : : * to remove all the existing tables.
1254 : : */
1532 alvherre@alvh.no-ip. 1255 [ + + + + ]: 459 : if (!tables && stmt->action != AP_SetObjects)
1600 akapila@postgresql.o 1256 : 38 : return;
1257 : :
1438 tomas.vondra@postgre 1258 : 421 : rels = OpenTableList(tables);
1259 : :
1532 alvherre@alvh.no-ip. 1260 [ + + ]: 421 : if (stmt->action == AP_AddObjects)
1261 : : {
1482 akapila@postgresql.o 1262 : 148 : TransformPubWhereClauses(rels, queryString, pubform->pubviaroot);
1263 : :
1269 1264 : 139 : publish_schema |= is_schema_publication(pubid);
1265 : :
1265 alvherre@alvh.no-ip. 1266 : 139 : CheckPubRelationColumnList(stmt->pubname, rels, publish_schema,
1269 akapila@postgresql.o 1267 : 139 : pubform->pubviaroot);
1268 : :
1438 tomas.vondra@postgre 1269 : 133 : PublicationAddTables(pubid, rels, false, stmt);
1270 : : }
1532 alvherre@alvh.no-ip. 1271 [ + + ]: 273 : else if (stmt->action == AP_DropObjects)
1438 tomas.vondra@postgre 1272 : 50 : PublicationDropTables(pubid, rels, false);
1273 : : else /* AP_SetObjects */
1274 : : {
11 akapila@postgresql.o 1275 :GNC 223 : List *oldrelids = GetIncludedPublicationRelations(pubid,
1276 : : PUBLICATION_PART_ROOT);
3342 peter_e@gmx.net 1277 :CBC 223 : List *delrels = NIL;
1278 : : ListCell *oldlc;
1279 : :
1482 akapila@postgresql.o 1280 : 223 : TransformPubWhereClauses(rels, queryString, pubform->pubviaroot);
1281 : :
1265 alvherre@alvh.no-ip. 1282 : 214 : CheckPubRelationColumnList(stmt->pubname, rels, publish_schema,
1269 akapila@postgresql.o 1283 : 214 : pubform->pubviaroot);
1284 : :
1285 : : /*
1286 : : * To recreate the relation list for the publication, look for
1287 : : * existing relations that do not need to be dropped.
1288 : : */
3342 peter_e@gmx.net 1289 [ + + + + : 403 : foreach(oldlc, oldrelids)
+ + ]
1290 : : {
1291 : 201 : Oid oldrelid = lfirst_oid(oldlc);
1292 : : ListCell *newlc;
1293 : : PublicationRelInfo *oldrel;
1294 : 201 : bool found = false;
1295 : : HeapTuple rftuple;
1482 akapila@postgresql.o 1296 : 201 : Node *oldrelwhereclause = NULL;
1450 tomas.vondra@postgre 1297 : 201 : Bitmapset *oldcolumns = NULL;
1298 : :
1299 : : /* look up the cache for the old relmap */
1482 akapila@postgresql.o 1300 : 201 : rftuple = SearchSysCache2(PUBLICATIONRELMAP,
1301 : : ObjectIdGetDatum(oldrelid),
1302 : : ObjectIdGetDatum(pubid));
1303 : :
1304 : : /*
1305 : : * See if the existing relation currently has a WHERE clause or a
1306 : : * column list. We need to compare those too.
1307 : : */
1308 [ + - ]: 201 : if (HeapTupleIsValid(rftuple))
1309 : : {
1450 tomas.vondra@postgre 1310 : 201 : bool isnull = true;
1311 : : Datum whereClauseDatum;
1312 : : Datum columnListDatum;
1313 : :
1314 : : /* Load the WHERE clause for this table. */
1482 akapila@postgresql.o 1315 : 201 : whereClauseDatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
1316 : : Anum_pg_publication_rel_prqual,
1317 : : &isnull);
1450 tomas.vondra@postgre 1318 [ + + ]: 201 : if (!isnull)
1482 akapila@postgresql.o 1319 : 105 : oldrelwhereclause = stringToNode(TextDatumGetCString(whereClauseDatum));
1320 : :
1321 : : /* Transform the int2vector column list to a bitmap. */
1450 tomas.vondra@postgre 1322 : 201 : columnListDatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
1323 : : Anum_pg_publication_rel_prattrs,
1324 : : &isnull);
1325 : :
1326 [ + + ]: 201 : if (!isnull)
1327 : 68 : oldcolumns = pub_collist_to_bitmapset(NULL, columnListDatum, NULL);
1328 : :
1482 akapila@postgresql.o 1329 : 201 : ReleaseSysCache(rftuple);
1330 : : }
1331 : :
3342 peter_e@gmx.net 1332 [ + + + + : 389 : foreach(newlc, rels)
+ + ]
1333 : : {
1334 : : PublicationRelInfo *newpubrel;
1335 : : Oid newrelid;
1403 tgl@sss.pgh.pa.us 1336 : 202 : Bitmapset *newcolumns = NULL;
1337 : :
1651 alvherre@alvh.no-ip. 1338 : 202 : newpubrel = (PublicationRelInfo *) lfirst(newlc);
1450 tomas.vondra@postgre 1339 : 202 : newrelid = RelationGetRelid(newpubrel->relation);
1340 : :
1341 : : /*
1342 : : * Validate the column list. If the column list or WHERE
1343 : : * clause changes, then the validation done here will be
1344 : : * duplicated inside PublicationAddTables(). The validation
1345 : : * is cheap enough that that seems harmless.
1346 : : */
577 drowley@postgresql.o 1347 : 202 : newcolumns = pub_collist_validate(newpubrel->relation,
1348 : : newpubrel->columns);
1349 : :
1350 : : /*
1351 : : * Check if any of the new set of relations matches with the
1352 : : * existing relations in the publication. Additionally, if the
1353 : : * relation has an associated WHERE clause, check the WHERE
1354 : : * expressions also match. Same for the column list. Drop the
1355 : : * rest.
1356 : : */
1357 [ + + ]: 196 : if (newrelid == oldrelid)
1358 : : {
1450 tomas.vondra@postgre 1359 [ + + + + ]: 142 : if (equal(oldrelwhereclause, newpubrel->whereClause) &&
1360 : 40 : bms_equal(oldcolumns, newcolumns))
1361 : : {
1482 akapila@postgresql.o 1362 : 8 : found = true;
1363 : 8 : break;
1364 : : }
1365 : : }
1366 : : }
1367 : :
1368 : : /*
1369 : : * Add the non-matched relations to a list so that they can be
1370 : : * dropped.
1371 : : */
1372 [ + + ]: 195 : if (!found)
1373 : : {
95 michael@paquier.xyz 1374 :GNC 187 : oldrel = palloc_object(PublicationRelInfo);
1482 akapila@postgresql.o 1375 :CBC 187 : oldrel->whereClause = NULL;
1450 tomas.vondra@postgre 1376 : 187 : oldrel->columns = NIL;
11 akapila@postgresql.o 1377 :GNC 187 : oldrel->except = false;
1482 akapila@postgresql.o 1378 :CBC 187 : oldrel->relation = table_open(oldrelid,
1379 : : ShareUpdateExclusiveLock);
1380 : 187 : delrels = lappend(delrels, oldrel);
1381 : : }
1382 : : }
1383 : :
1384 : : /* And drop them. */
1438 tomas.vondra@postgre 1385 : 202 : PublicationDropTables(pubid, delrels, true);
1386 : :
1387 : : /*
1388 : : * Don't bother calculating the difference for adding, we'll catch and
1389 : : * skip existing ones when doing catalog update.
1390 : : */
1391 : 202 : PublicationAddTables(pubid, rels, true, stmt);
1392 : :
1393 : 202 : CloseTableList(delrels);
1394 : : }
1395 : :
1396 : 355 : CloseTableList(rels);
1397 : : }
1398 : :
1399 : : /*
1400 : : * Alter the publication schemas.
1401 : : *
1402 : : * Add or remove schemas to/from publication.
1403 : : */
1404 : : static void
1600 akapila@postgresql.o 1405 : 393 : AlterPublicationSchemas(AlterPublicationStmt *stmt,
1406 : : HeapTuple tup, List *schemaidlist)
1407 : : {
1408 : 393 : Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
1409 : :
1410 : : /*
1411 : : * Nothing to do if no objects, except in SET: for that it is quite
1412 : : * possible that user has not specified any schemas in which case we need
1413 : : * to remove all the existing schemas.
1414 : : */
1532 alvherre@alvh.no-ip. 1415 [ + + + + ]: 393 : if (!schemaidlist && stmt->action != AP_SetObjects)
1600 akapila@postgresql.o 1416 : 153 : return;
1417 : :
1418 : : /*
1419 : : * Schema lock is held until the publication is altered to prevent
1420 : : * concurrent schema deletion.
1421 : : */
1422 : 240 : LockSchemaList(schemaidlist);
1532 alvherre@alvh.no-ip. 1423 [ + + ]: 240 : if (stmt->action == AP_AddObjects)
1424 : : {
1425 : : ListCell *lc;
1426 : : List *reloids;
1427 : :
11 akapila@postgresql.o 1428 :GNC 19 : reloids = GetIncludedPublicationRelations(pubform->oid,
1429 : : PUBLICATION_PART_ROOT);
1430 : :
1269 akapila@postgresql.o 1431 [ + + + + :CBC 27 : foreach(lc, reloids)
+ + ]
1432 : : {
1433 : : HeapTuple coltuple;
1434 : :
1435 : 11 : coltuple = SearchSysCache2(PUBLICATIONRELMAP,
1436 : : ObjectIdGetDatum(lfirst_oid(lc)),
1437 : : ObjectIdGetDatum(pubform->oid));
1438 : :
1439 [ - + ]: 11 : if (!HeapTupleIsValid(coltuple))
1269 akapila@postgresql.o 1440 :UBC 0 : continue;
1441 : :
1442 : : /*
1443 : : * Disallow adding schema if column list is already part of the
1444 : : * publication. See CheckPubRelationColumnList.
1445 : : */
1269 akapila@postgresql.o 1446 [ + + ]:CBC 11 : if (!heap_attisnull(coltuple, Anum_pg_publication_rel_prattrs, NULL))
1447 [ + - ]: 3 : ereport(ERROR,
1448 : : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1449 : : errmsg("cannot add schema to publication \"%s\"",
1450 : : stmt->pubname),
1451 : : errdetail("Schemas cannot be added if any tables that specify a column list are already part of the publication."));
1452 : :
1453 : 8 : ReleaseSysCache(coltuple);
1454 : : }
1455 : :
1438 tomas.vondra@postgre 1456 : 16 : PublicationAddSchemas(pubform->oid, schemaidlist, false, stmt);
1457 : : }
1532 alvherre@alvh.no-ip. 1458 [ + + ]: 221 : else if (stmt->action == AP_DropObjects)
1438 tomas.vondra@postgre 1459 : 19 : PublicationDropSchemas(pubform->oid, schemaidlist, false);
1460 : : else /* AP_SetObjects */
1461 : : {
1462 : 202 : List *oldschemaids = GetPublicationSchemas(pubform->oid);
1600 akapila@postgresql.o 1463 : 202 : List *delschemas = NIL;
1464 : :
1465 : : /* Identify which schemas should be dropped */
1466 : 202 : delschemas = list_difference_oid(oldschemaids, schemaidlist);
1467 : :
1468 : : /*
1469 : : * Schema lock is held until the publication is altered to prevent
1470 : : * concurrent schema deletion.
1471 : : */
1472 : 202 : LockSchemaList(delschemas);
1473 : :
1474 : : /* And drop them */
1438 tomas.vondra@postgre 1475 : 202 : PublicationDropSchemas(pubform->oid, delschemas, true);
1476 : :
1477 : : /*
1478 : : * Don't bother calculating the difference for adding, we'll catch and
1479 : : * skip existing ones when doing catalog update.
1480 : : */
1481 : 202 : PublicationAddSchemas(pubform->oid, schemaidlist, true, stmt);
1482 : : }
1483 : : }
1484 : :
1485 : : /*
1486 : : * Check if relations and schemas can be in a given publication and throw
1487 : : * appropriate error if not.
1488 : : */
1489 : : static void
1600 akapila@postgresql.o 1490 : 480 : CheckAlterPublication(AlterPublicationStmt *stmt, HeapTuple tup,
1491 : : List *tables, List *schemaidlist)
1492 : : {
1493 : 480 : Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
1494 : :
1532 alvherre@alvh.no-ip. 1495 [ + + + + : 480 : if ((stmt->action == AP_AddObjects || stmt->action == AP_SetObjects) &&
+ + ]
1438 tomas.vondra@postgre 1496 [ + + ]: 52 : schemaidlist && !superuser())
1600 akapila@postgresql.o 1497 [ + - ]: 3 : ereport(ERROR,
1498 : : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1499 : : errmsg("must be superuser to add or set schemas")));
1500 : :
1501 : : /*
1502 : : * Check that user is allowed to manipulate the publication tables in
1503 : : * schema
1504 : : */
157 akapila@postgresql.o 1505 [ + + + + :GNC 477 : if (schemaidlist && (pubform->puballtables || pubform->puballsequences))
- + ]
1506 : : {
1507 [ + - - + ]: 9 : if (pubform->puballtables && pubform->puballsequences)
157 akapila@postgresql.o 1508 [ # # ]:UNC 0 : ereport(ERROR,
1509 : : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1510 : : errmsg("publication \"%s\" is defined as FOR ALL TABLES, ALL SEQUENCES",
1511 : : NameStr(pubform->pubname)),
1512 : : errdetail("Schemas cannot be added to or dropped from FOR ALL TABLES, ALL SEQUENCES publications."));
157 akapila@postgresql.o 1513 [ + - ]:GNC 9 : else if (pubform->puballtables)
1514 [ + - ]: 9 : ereport(ERROR,
1515 : : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1516 : : errmsg("publication \"%s\" is defined as FOR ALL TABLES",
1517 : : NameStr(pubform->pubname)),
1518 : : errdetail("Schemas cannot be added to or dropped from FOR ALL TABLES publications."));
1519 : : else
157 akapila@postgresql.o 1520 [ # # ]:UNC 0 : ereport(ERROR,
1521 : : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1522 : : errmsg("publication \"%s\" is defined as FOR ALL SEQUENCES",
1523 : : NameStr(pubform->pubname)),
1524 : : errdetail("Schemas cannot be added to or dropped from FOR ALL SEQUENCES publications."));
1525 : : }
1526 : :
1527 : : /* Check that user is allowed to manipulate the publication tables. */
157 akapila@postgresql.o 1528 [ + + + + :GNC 468 : if (tables && (pubform->puballtables || pubform->puballsequences))
- + ]
1529 : : {
1530 [ + - - + ]: 9 : if (pubform->puballtables && pubform->puballsequences)
157 akapila@postgresql.o 1531 [ # # ]:UNC 0 : ereport(ERROR,
1532 : : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1533 : : errmsg("publication \"%s\" is defined as FOR ALL TABLES, ALL SEQUENCES",
1534 : : NameStr(pubform->pubname)),
1535 : : errdetail("Tables or sequences cannot be added to or dropped from FOR ALL TABLES, ALL SEQUENCES publications."));
157 akapila@postgresql.o 1536 [ + - ]:GNC 9 : else if (pubform->puballtables)
1537 [ + - ]: 9 : ereport(ERROR,
1538 : : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1539 : : errmsg("publication \"%s\" is defined as FOR ALL TABLES",
1540 : : NameStr(pubform->pubname)),
1541 : : errdetail("Tables or sequences cannot be added to or dropped from FOR ALL TABLES publications."));
1542 : : else
157 akapila@postgresql.o 1543 [ # # ]:UNC 0 : ereport(ERROR,
1544 : : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1545 : : errmsg("publication \"%s\" is defined as FOR ALL SEQUENCES",
1546 : : NameStr(pubform->pubname)),
1547 : : errdetail("Tables or sequences cannot be added to or dropped from FOR ALL SEQUENCES publications."));
1548 : : }
1600 akapila@postgresql.o 1549 :CBC 459 : }
1550 : :
1551 : : /*
1552 : : * Alter the existing publication.
1553 : : *
1554 : : * This is dispatcher function for AlterPublicationOptions,
1555 : : * AlterPublicationSchemas and AlterPublicationTables.
1556 : : */
1557 : : void
1704 dean.a.rasheed@gmail 1558 : 553 : AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt)
1559 : : {
1560 : : Relation rel;
1561 : : HeapTuple tup;
1562 : : Form_pg_publication pubform;
1563 : :
2610 andres@anarazel.de 1564 : 553 : rel = table_open(PublicationRelationId, RowExclusiveLock);
1565 : :
3342 peter_e@gmx.net 1566 : 553 : tup = SearchSysCacheCopy1(PUBLICATIONNAME,
1567 : : CStringGetDatum(stmt->pubname));
1568 : :
1569 [ - + ]: 553 : if (!HeapTupleIsValid(tup))
3342 peter_e@gmx.net 1570 [ # # ]:UBC 0 : ereport(ERROR,
1571 : : (errcode(ERRCODE_UNDEFINED_OBJECT),
1572 : : errmsg("publication \"%s\" does not exist",
1573 : : stmt->pubname)));
1574 : :
2672 andres@anarazel.de 1575 :CBC 553 : pubform = (Form_pg_publication) GETSTRUCT(tup);
1576 : :
1577 : : /* must be owner */
1218 peter@eisentraut.org 1578 [ - + ]: 553 : if (!object_ownercheck(PublicationRelationId, pubform->oid, GetUserId()))
3025 peter_e@gmx.net 1579 :UBC 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_PUBLICATION,
3342 1580 : 0 : stmt->pubname);
1581 : :
3342 peter_e@gmx.net 1582 [ + + ]:CBC 553 : if (stmt->options)
1704 dean.a.rasheed@gmail 1583 : 64 : AlterPublicationOptions(pstate, stmt, rel, tup);
1584 : : else
1585 : : {
1438 tomas.vondra@postgre 1586 : 489 : List *relations = NIL;
11 akapila@postgresql.o 1587 :GNC 489 : List *exceptrelations = NIL;
1438 tomas.vondra@postgre 1588 :CBC 489 : List *schemaidlist = NIL;
1482 akapila@postgresql.o 1589 : 489 : Oid pubid = pubform->oid;
1590 : :
1438 tomas.vondra@postgre 1591 : 489 : ObjectsInPublicationToOids(stmt->pubobjects, pstate, &relations,
1592 : : &exceptrelations, &schemaidlist);
1593 : :
1594 : : /* EXCEPT clause is not supported with ALTER PUBLICATION */
11 akapila@postgresql.o 1595 [ - + ]:GNC 480 : Assert(exceptrelations == NIL);
1596 : :
1438 tomas.vondra@postgre 1597 :CBC 480 : CheckAlterPublication(stmt, tup, relations, schemaidlist);
1598 : :
1482 akapila@postgresql.o 1599 : 459 : heap_freetuple(tup);
1600 : :
1601 : : /* Lock the publication so nobody else can do anything with it. */
1602 : 459 : LockDatabaseObject(PublicationRelationId, pubid, 0,
1603 : : AccessExclusiveLock);
1604 : :
1605 : : /*
1606 : : * It is possible that by the time we acquire the lock on publication,
1607 : : * concurrent DDL has removed it. We can test this by checking the
1608 : : * existence of publication. We get the tuple again to avoid the risk
1609 : : * of any publication option getting changed.
1610 : : */
1611 : 459 : tup = SearchSysCacheCopy1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
1612 [ - + ]: 459 : if (!HeapTupleIsValid(tup))
1600 akapila@postgresql.o 1613 [ # # ]:UBC 0 : ereport(ERROR,
1614 : : errcode(ERRCODE_UNDEFINED_OBJECT),
1615 : : errmsg("publication \"%s\" does not exist",
1616 : : stmt->pubname));
1617 : :
1269 akapila@postgresql.o 1618 :CBC 459 : AlterPublicationTables(stmt, tup, relations, pstate->p_sourcetext,
1619 : : schemaidlist != NIL);
1438 tomas.vondra@postgre 1620 : 393 : AlterPublicationSchemas(stmt, tup, schemaidlist);
1621 : : }
1622 : :
1623 : : /* Cleanup. */
3342 peter_e@gmx.net 1624 : 442 : heap_freetuple(tup);
2610 andres@anarazel.de 1625 : 442 : table_close(rel, RowExclusiveLock);
3342 peter_e@gmx.net 1626 : 442 : }
1627 : :
1628 : : /*
1629 : : * Remove relation from publication by mapping OID.
1630 : : */
1631 : : void
1632 : 459 : RemovePublicationRelById(Oid proid)
1633 : : {
1634 : : Relation rel;
1635 : : HeapTuple tup;
1636 : : Form_pg_publication_rel pubrel;
1635 akapila@postgresql.o 1637 : 459 : List *relids = NIL;
1638 : :
2610 andres@anarazel.de 1639 : 459 : rel = table_open(PublicationRelRelationId, RowExclusiveLock);
1640 : :
3342 peter_e@gmx.net 1641 : 459 : tup = SearchSysCache1(PUBLICATIONREL, ObjectIdGetDatum(proid));
1642 : :
1643 [ - + ]: 459 : if (!HeapTupleIsValid(tup))
3342 peter_e@gmx.net 1644 [ # # ]:UBC 0 : elog(ERROR, "cache lookup failed for publication table %u",
1645 : : proid);
1646 : :
3342 peter_e@gmx.net 1647 :CBC 459 : pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
1648 : :
1649 : : /*
1650 : : * Invalidate relcache so that publication info is rebuilt.
1651 : : *
1652 : : * For the partitioned tables, we must invalidate all partitions contained
1653 : : * in the respective partition hierarchies, not just the one explicitly
1654 : : * mentioned in the publication. This is required because we implicitly
1655 : : * publish the child tables when the parent table is published.
1656 : : */
1635 akapila@postgresql.o 1657 : 459 : relids = GetPubPartitionOptionRelations(relids, PUBLICATION_PART_ALL,
1658 : : pubrel->prrelid);
1659 : :
1660 : 459 : InvalidatePublicationRels(relids);
1661 : :
3329 tgl@sss.pgh.pa.us 1662 : 459 : CatalogTupleDelete(rel, &tup->t_self);
1663 : :
3342 peter_e@gmx.net 1664 : 459 : ReleaseSysCache(tup);
1665 : :
2610 andres@anarazel.de 1666 : 459 : table_close(rel, RowExclusiveLock);
3342 peter_e@gmx.net 1667 : 459 : }
1668 : :
1669 : : /*
1670 : : * Remove the publication by mapping OID.
1671 : : */
1672 : : void
1649 akapila@postgresql.o 1673 : 274 : RemovePublicationById(Oid pubid)
1674 : : {
1675 : : Relation rel;
1676 : : HeapTuple tup;
1677 : : Form_pg_publication pubform;
1678 : :
1679 : 274 : rel = table_open(PublicationRelationId, RowExclusiveLock);
1680 : :
1681 : 274 : tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
1682 [ - + ]: 274 : if (!HeapTupleIsValid(tup))
1649 akapila@postgresql.o 1683 [ # # ]:UBC 0 : elog(ERROR, "cache lookup failed for publication %u", pubid);
1684 : :
1536 alvherre@alvh.no-ip. 1685 :CBC 274 : pubform = (Form_pg_publication) GETSTRUCT(tup);
1686 : :
1687 : : /* Invalidate relcache so that publication info is rebuilt. */
1438 tomas.vondra@postgre 1688 [ + + ]: 274 : if (pubform->puballtables)
1649 akapila@postgresql.o 1689 : 58 : CacheInvalidateRelcacheAll();
1690 : :
1691 : 274 : CatalogTupleDelete(rel, &tup->t_self);
1692 : :
1693 : 274 : ReleaseSysCache(tup);
1694 : :
1695 : 274 : table_close(rel, RowExclusiveLock);
1696 : 274 : }
1697 : :
1698 : : /*
1699 : : * Remove schema from publication by mapping OID.
1700 : : */
1701 : : void
1600 1702 : 96 : RemovePublicationSchemaById(Oid psoid)
1703 : : {
1704 : : Relation rel;
1705 : : HeapTuple tup;
1706 : 96 : List *schemaRels = NIL;
1707 : : Form_pg_publication_namespace pubsch;
1708 : :
1709 : 96 : rel = table_open(PublicationNamespaceRelationId, RowExclusiveLock);
1710 : :
1711 : 96 : tup = SearchSysCache1(PUBLICATIONNAMESPACE, ObjectIdGetDatum(psoid));
1712 : :
1713 [ - + ]: 96 : if (!HeapTupleIsValid(tup))
1600 akapila@postgresql.o 1714 [ # # ]:UBC 0 : elog(ERROR, "cache lookup failed for publication schema %u", psoid);
1715 : :
1600 akapila@postgresql.o 1716 :CBC 96 : pubsch = (Form_pg_publication_namespace) GETSTRUCT(tup);
1717 : :
1718 : : /*
1719 : : * Invalidate relcache so that publication info is rebuilt. See
1720 : : * RemovePublicationRelById for why we need to consider all the
1721 : : * partitions.
1722 : : */
1723 : 96 : schemaRels = GetSchemaPublicationRelations(pubsch->pnnspid,
1724 : : PUBLICATION_PART_ALL);
1725 : 96 : InvalidatePublicationRels(schemaRels);
1726 : :
1727 : 96 : CatalogTupleDelete(rel, &tup->t_self);
1728 : :
1729 : 96 : ReleaseSysCache(tup);
1730 : :
1731 : 96 : table_close(rel, RowExclusiveLock);
1732 : 96 : }
1733 : :
1734 : : /*
1735 : : * Open relations specified by a PublicationTable list.
1736 : : * The returned tables are locked in ShareUpdateExclusiveLock mode in order to
1737 : : * add them to a publication.
1738 : : */
1739 : : static List *
1438 tomas.vondra@postgre 1740 : 688 : OpenTableList(List *tables)
1741 : : {
3342 peter_e@gmx.net 1742 : 688 : List *relids = NIL;
1438 tomas.vondra@postgre 1743 : 688 : List *rels = NIL;
1744 : : ListCell *lc;
1482 akapila@postgresql.o 1745 : 688 : List *relids_with_rf = NIL;
1450 tomas.vondra@postgre 1746 : 688 : List *relids_with_collist = NIL;
1747 : :
1748 : : /*
1749 : : * Open, share-lock, and check all the explicitly-specified relations
1750 : : */
1438 1751 [ + + + + : 1416 : foreach(lc, tables)
+ + ]
1752 : : {
1651 alvherre@alvh.no-ip. 1753 : 743 : PublicationTable *t = lfirst_node(PublicationTable, lc);
1754 : 743 : bool recurse = t->relation->inh;
1755 : : Relation rel;
1756 : : Oid myrelid;
1757 : : PublicationRelInfo *pub_rel;
1758 : :
1759 : : /* Allow query cancel in case this takes a long time */
3342 peter_e@gmx.net 1760 [ - + ]: 743 : CHECK_FOR_INTERRUPTS();
1761 : :
1651 alvherre@alvh.no-ip. 1762 : 743 : rel = table_openrv(t->relation, ShareUpdateExclusiveLock);
3342 peter_e@gmx.net 1763 : 740 : myrelid = RelationGetRelid(rel);
1764 : :
1765 : : /*
1766 : : * Filter out duplicates if user specifies "foo, foo".
1767 : : *
1768 : : * Note that this algorithm is known to not be very efficient (O(N^2))
1769 : : * but given that it only works on list of tables given to us by user
1770 : : * it's deemed acceptable.
1771 : : */
1772 [ + + ]: 740 : if (list_member_oid(relids, myrelid))
1773 : : {
1774 : : /* Disallow duplicate tables if there are any with row filters. */
1482 akapila@postgresql.o 1775 [ + + + + ]: 12 : if (t->whereClause || list_member_oid(relids_with_rf, myrelid))
1776 [ + - ]: 6 : ereport(ERROR,
1777 : : (errcode(ERRCODE_DUPLICATE_OBJECT),
1778 : : errmsg("conflicting or redundant WHERE clauses for table \"%s\"",
1779 : : RelationGetRelationName(rel))));
1780 : :
1781 : : /* Disallow duplicate tables if there are any with column lists. */
1450 tomas.vondra@postgre 1782 [ + + + - ]: 6 : if (t->columns || list_member_oid(relids_with_collist, myrelid))
1783 [ + - ]: 6 : ereport(ERROR,
1784 : : (errcode(ERRCODE_DUPLICATE_OBJECT),
1785 : : errmsg("conflicting or redundant column lists for table \"%s\"",
1786 : : RelationGetRelationName(rel))));
1787 : :
2610 andres@anarazel.de 1788 :UBC 0 : table_close(rel, ShareUpdateExclusiveLock);
3342 peter_e@gmx.net 1789 : 0 : continue;
1790 : : }
1791 : :
95 michael@paquier.xyz 1792 :GNC 728 : pub_rel = palloc_object(PublicationRelInfo);
1651 alvherre@alvh.no-ip. 1793 :CBC 728 : pub_rel->relation = rel;
1482 akapila@postgresql.o 1794 : 728 : pub_rel->whereClause = t->whereClause;
1450 tomas.vondra@postgre 1795 : 728 : pub_rel->columns = t->columns;
11 akapila@postgresql.o 1796 :GNC 728 : pub_rel->except = t->except;
1438 tomas.vondra@postgre 1797 :CBC 728 : rels = lappend(rels, pub_rel);
3342 peter_e@gmx.net 1798 : 728 : relids = lappend_oid(relids, myrelid);
1799 : :
1482 akapila@postgresql.o 1800 [ + + ]: 728 : if (t->whereClause)
1801 : 202 : relids_with_rf = lappend_oid(relids_with_rf, myrelid);
1802 : :
1450 tomas.vondra@postgre 1803 [ + + ]: 728 : if (t->columns)
1804 : 205 : relids_with_collist = lappend_oid(relids_with_collist, myrelid);
1805 : :
1806 : : /*
1807 : : * Add children of this rel, if requested, so that they too are added
1808 : : * to the publication. A partitioned table can't have any inheritance
1809 : : * children other than its partitions, which need not be explicitly
1810 : : * added to the publication.
1811 : : */
2196 peter@eisentraut.org 1812 [ + + + + ]: 728 : if (recurse && rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
1813 : : {
1814 : : List *children;
1815 : : ListCell *child;
1816 : :
3342 peter_e@gmx.net 1817 : 607 : children = find_all_inheritors(myrelid, ShareUpdateExclusiveLock,
1818 : : NULL);
1819 : :
1820 [ + - + + : 1226 : foreach(child, children)
+ + ]
1821 : : {
1822 : 619 : Oid childrelid = lfirst_oid(child);
1823 : :
1824 : : /* Allow query cancel in case this takes a long time */
2655 tgl@sss.pgh.pa.us 1825 [ - + ]: 619 : CHECK_FOR_INTERRUPTS();
1826 : :
1827 : : /*
1828 : : * Skip duplicates if user specified both parent and child
1829 : : * tables.
1830 : : */
3342 peter_e@gmx.net 1831 [ + + ]: 619 : if (list_member_oid(relids, childrelid))
1832 : : {
1833 : : /*
1834 : : * We don't allow to specify row filter for both parent
1835 : : * and child table at the same time as it is not very
1836 : : * clear which one should be given preference.
1837 : : */
1482 akapila@postgresql.o 1838 [ - + ]: 607 : if (childrelid != myrelid &&
1482 akapila@postgresql.o 1839 [ # # # # ]:UBC 0 : (t->whereClause || list_member_oid(relids_with_rf, childrelid)))
1840 [ # # ]: 0 : ereport(ERROR,
1841 : : (errcode(ERRCODE_DUPLICATE_OBJECT),
1842 : : errmsg("conflicting or redundant WHERE clauses for table \"%s\"",
1843 : : RelationGetRelationName(rel))));
1844 : :
1845 : : /*
1846 : : * We don't allow to specify column list for both parent
1847 : : * and child table at the same time as it is not very
1848 : : * clear which one should be given preference.
1849 : : */
1450 tomas.vondra@postgre 1850 [ - + ]:CBC 607 : if (childrelid != myrelid &&
1450 tomas.vondra@postgre 1851 [ # # # # ]:UBC 0 : (t->columns || list_member_oid(relids_with_collist, childrelid)))
1852 [ # # ]: 0 : ereport(ERROR,
1853 : : (errcode(ERRCODE_DUPLICATE_OBJECT),
1854 : : errmsg("conflicting or redundant column lists for table \"%s\"",
1855 : : RelationGetRelationName(rel))));
1856 : :
3342 peter_e@gmx.net 1857 :CBC 607 : continue;
1858 : : }
1859 : :
1860 : : /* find_all_inheritors already got lock */
2610 andres@anarazel.de 1861 : 12 : rel = table_open(childrelid, NoLock);
95 michael@paquier.xyz 1862 :GNC 12 : pub_rel = palloc_object(PublicationRelInfo);
1651 alvherre@alvh.no-ip. 1863 :CBC 12 : pub_rel->relation = rel;
1864 : : /* child inherits WHERE clause from parent */
1482 akapila@postgresql.o 1865 : 12 : pub_rel->whereClause = t->whereClause;
1866 : :
1867 : : /* child inherits column list from parent */
1450 tomas.vondra@postgre 1868 : 12 : pub_rel->columns = t->columns;
11 akapila@postgresql.o 1869 :GNC 12 : pub_rel->except = t->except;
1438 tomas.vondra@postgre 1870 :CBC 12 : rels = lappend(rels, pub_rel);
3342 peter_e@gmx.net 1871 : 12 : relids = lappend_oid(relids, childrelid);
1872 : :
1482 akapila@postgresql.o 1873 [ + + ]: 12 : if (t->whereClause)
1874 : 1 : relids_with_rf = lappend_oid(relids_with_rf, childrelid);
1875 : :
1450 tomas.vondra@postgre 1876 [ - + ]: 12 : if (t->columns)
1450 tomas.vondra@postgre 1877 :UBC 0 : relids_with_collist = lappend_oid(relids_with_collist, childrelid);
1878 : : }
1879 : : }
1880 : : }
1881 : :
3342 peter_e@gmx.net 1882 :CBC 673 : list_free(relids);
1482 akapila@postgresql.o 1883 : 673 : list_free(relids_with_rf);
1884 : :
1438 tomas.vondra@postgre 1885 : 673 : return rels;
1886 : : }
1887 : :
1888 : : /*
1889 : : * Close all relations in the list.
1890 : : */
1891 : : static void
1892 : 778 : CloseTableList(List *rels)
1893 : : {
1894 : : ListCell *lc;
1895 : :
3342 peter_e@gmx.net 1896 [ + + + + : 1596 : foreach(lc, rels)
+ + ]
1897 : : {
1898 : : PublicationRelInfo *pub_rel;
1899 : :
1651 alvherre@alvh.no-ip. 1900 : 818 : pub_rel = (PublicationRelInfo *) lfirst(lc);
1901 : 818 : table_close(pub_rel->relation, NoLock);
1902 : : }
1903 : :
1482 akapila@postgresql.o 1904 : 778 : list_free_deep(rels);
3342 peter_e@gmx.net 1905 : 778 : }
1906 : :
1907 : : /*
1908 : : * Lock the schemas specified in the schema list in AccessShareLock mode in
1909 : : * order to prevent concurrent schema deletion.
1910 : : */
1911 : : static void
1600 akapila@postgresql.o 1912 : 518 : LockSchemaList(List *schemalist)
1913 : : {
1914 : : ListCell *lc;
1915 : :
1916 [ + + + + : 680 : foreach(lc, schemalist)
+ + ]
1917 : : {
1918 : 162 : Oid schemaid = lfirst_oid(lc);
1919 : :
1920 : : /* Allow query cancel in case this takes a long time */
1921 [ - + ]: 162 : CHECK_FOR_INTERRUPTS();
1922 : 162 : LockDatabaseObject(NamespaceRelationId, schemaid, 0, AccessShareLock);
1923 : :
1924 : : /*
1925 : : * It is possible that by the time we acquire the lock on schema,
1926 : : * concurrent DDL has removed it. We can test this by checking the
1927 : : * existence of schema.
1928 : : */
1929 [ - + ]: 162 : if (!SearchSysCacheExists1(NAMESPACEOID, ObjectIdGetDatum(schemaid)))
1600 akapila@postgresql.o 1930 [ # # ]:UBC 0 : ereport(ERROR,
1931 : : errcode(ERRCODE_UNDEFINED_SCHEMA),
1932 : : errmsg("schema with OID %u does not exist", schemaid));
1933 : : }
1600 akapila@postgresql.o 1934 :CBC 518 : }
1935 : :
1936 : : /*
1937 : : * Add listed tables to the publication.
1938 : : */
1939 : : static void
1438 tomas.vondra@postgre 1940 : 572 : PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
1941 : : AlterPublicationStmt *stmt)
1942 : : {
1943 : : ListCell *lc;
1944 : :
3342 peter_e@gmx.net 1945 [ + + + + : 1159 : foreach(lc, rels)
+ + ]
1946 : : {
1651 alvherre@alvh.no-ip. 1947 : 624 : PublicationRelInfo *pub_rel = (PublicationRelInfo *) lfirst(lc);
1948 : 624 : Relation rel = pub_rel->relation;
1949 : : ObjectAddress obj;
1950 : :
1951 : : /* Must be owner of the table or superuser. */
1218 peter@eisentraut.org 1952 [ + + ]: 624 : if (!object_ownercheck(RelationRelationId, RelationGetRelid(rel), GetUserId()))
3025 peter_e@gmx.net 1953 : 3 : aclcheck_error(ACLCHECK_NOT_OWNER, get_relkind_objtype(rel->rd_rel->relkind),
3342 1954 : 3 : RelationGetRelationName(rel));
1955 : :
1651 alvherre@alvh.no-ip. 1956 : 621 : obj = publication_add_relation(pubid, pub_rel, if_not_exists);
3342 peter_e@gmx.net 1957 [ + + ]: 587 : if (stmt)
1958 : : {
1959 : 314 : EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
1960 : : (Node *) stmt);
1961 : :
1962 [ - + ]: 314 : InvokeObjectPostCreateHook(PublicationRelRelationId,
1963 : : obj.objectId, 0);
1964 : : }
1965 : : }
1966 : 535 : }
1967 : :
1968 : : /*
1969 : : * Remove listed tables from the publication.
1970 : : */
1971 : : static void
1438 tomas.vondra@postgre 1972 : 252 : PublicationDropTables(Oid pubid, List *rels, bool missing_ok)
1973 : : {
1974 : : ObjectAddress obj;
1975 : : ListCell *lc;
1976 : : Oid prid;
1977 : :
3342 peter_e@gmx.net 1978 [ + + + + : 483 : foreach(lc, rels)
+ + ]
1979 : : {
1651 alvherre@alvh.no-ip. 1980 : 240 : PublicationRelInfo *pubrel = (PublicationRelInfo *) lfirst(lc);
1981 : 240 : Relation rel = pubrel->relation;
3342 peter_e@gmx.net 1982 : 240 : Oid relid = RelationGetRelid(rel);
1983 : :
1450 tomas.vondra@postgre 1984 [ - + ]: 240 : if (pubrel->columns)
1450 tomas.vondra@postgre 1985 [ # # ]:UBC 0 : ereport(ERROR,
1986 : : errcode(ERRCODE_SYNTAX_ERROR),
1987 : : errmsg("column list must not be specified in ALTER PUBLICATION ... DROP"));
1988 : :
2672 andres@anarazel.de 1989 :CBC 240 : prid = GetSysCacheOid2(PUBLICATIONRELMAP, Anum_pg_publication_rel_oid,
1990 : : ObjectIdGetDatum(relid),
1991 : : ObjectIdGetDatum(pubid));
3342 peter_e@gmx.net 1992 [ + + ]: 240 : if (!OidIsValid(prid))
1993 : : {
1994 [ - + ]: 6 : if (missing_ok)
3342 peter_e@gmx.net 1995 :UBC 0 : continue;
1996 : :
3342 peter_e@gmx.net 1997 [ + - ]:CBC 6 : ereport(ERROR,
1998 : : (errcode(ERRCODE_UNDEFINED_OBJECT),
1999 : : errmsg("relation \"%s\" is not part of the publication",
2000 : : RelationGetRelationName(rel))));
2001 : : }
2002 : :
1482 akapila@postgresql.o 2003 [ + + ]: 234 : if (pubrel->whereClause)
2004 [ + - ]: 3 : ereport(ERROR,
2005 : : (errcode(ERRCODE_SYNTAX_ERROR),
2006 : : errmsg("cannot use a WHERE clause when removing a table from a publication")));
2007 : :
3342 peter_e@gmx.net 2008 : 231 : ObjectAddressSet(obj, PublicationRelRelationId, prid);
2009 : 231 : performDeletion(&obj, DROP_CASCADE, 0);
2010 : : }
2011 : 243 : }
2012 : :
2013 : : /*
2014 : : * Add listed schemas to the publication.
2015 : : */
2016 : : static void
1438 tomas.vondra@postgre 2017 : 294 : PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists,
2018 : : AlterPublicationStmt *stmt)
2019 : : {
2020 : : ListCell *lc;
2021 : :
1600 akapila@postgresql.o 2022 [ + + + + : 419 : foreach(lc, schemas)
+ + ]
2023 : : {
2024 : 131 : Oid schemaid = lfirst_oid(lc);
2025 : : ObjectAddress obj;
2026 : :
1438 tomas.vondra@postgre 2027 : 131 : obj = publication_add_schema(pubid, schemaid, if_not_exists);
1600 akapila@postgresql.o 2028 [ + + ]: 125 : if (stmt)
2029 : : {
2030 : 34 : EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
2031 : : (Node *) stmt);
2032 : :
2033 [ - + ]: 34 : InvokeObjectPostCreateHook(PublicationNamespaceRelationId,
2034 : : obj.objectId, 0);
2035 : : }
2036 : : }
2037 : 288 : }
2038 : :
2039 : : /*
2040 : : * Remove listed schemas from the publication.
2041 : : */
2042 : : static void
1438 tomas.vondra@postgre 2043 : 221 : PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok)
2044 : : {
2045 : : ObjectAddress obj;
2046 : : ListCell *lc;
2047 : : Oid psid;
2048 : :
1600 akapila@postgresql.o 2049 [ + + + + : 246 : foreach(lc, schemas)
+ + ]
2050 : : {
2051 : 28 : Oid schemaid = lfirst_oid(lc);
2052 : :
1438 tomas.vondra@postgre 2053 : 28 : psid = GetSysCacheOid2(PUBLICATIONNAMESPACEMAP,
2054 : : Anum_pg_publication_namespace_oid,
2055 : : ObjectIdGetDatum(schemaid),
2056 : : ObjectIdGetDatum(pubid));
1600 akapila@postgresql.o 2057 [ + + ]: 28 : if (!OidIsValid(psid))
2058 : : {
2059 [ - + ]: 3 : if (missing_ok)
1600 akapila@postgresql.o 2060 :UBC 0 : continue;
2061 : :
1600 akapila@postgresql.o 2062 [ + - ]:CBC 3 : ereport(ERROR,
2063 : : (errcode(ERRCODE_UNDEFINED_OBJECT),
2064 : : errmsg("tables from schema \"%s\" are not part of the publication",
2065 : : get_namespace_name(schemaid))));
2066 : : }
2067 : :
2068 : 25 : ObjectAddressSet(obj, PublicationNamespaceRelationId, psid);
2069 : 25 : performDeletion(&obj, DROP_CASCADE, 0);
2070 : : }
2071 : 218 : }
2072 : :
2073 : : /*
2074 : : * Internal workhorse for changing a publication owner
2075 : : */
2076 : : static void
3342 peter_e@gmx.net 2077 : 18 : AlterPublicationOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
2078 : : {
2079 : : Form_pg_publication form;
2080 : :
2081 : 18 : form = (Form_pg_publication) GETSTRUCT(tup);
2082 : :
2083 [ + + ]: 18 : if (form->pubowner == newOwnerId)
2084 : 6 : return;
2085 : :
3317 2086 [ + + ]: 12 : if (!superuser())
2087 : : {
2088 : : AclResult aclresult;
2089 : :
2090 : : /* Must be owner */
1218 peter@eisentraut.org 2091 [ - + ]: 6 : if (!object_ownercheck(PublicationRelationId, form->oid, GetUserId()))
3025 peter_e@gmx.net 2092 :UBC 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_PUBLICATION,
3317 2093 : 0 : NameStr(form->pubname));
2094 : :
2095 : : /* Must be able to become new owner */
1213 rhaas@postgresql.org 2096 :CBC 6 : check_can_set_role(GetUserId(), newOwnerId);
2097 : :
2098 : : /* New owner must have CREATE privilege on database */
1218 peter@eisentraut.org 2099 : 6 : aclresult = object_aclcheck(DatabaseRelationId, MyDatabaseId, newOwnerId, ACL_CREATE);
3317 peter_e@gmx.net 2100 [ - + ]: 6 : if (aclresult != ACLCHECK_OK)
3025 peter_e@gmx.net 2101 :UBC 0 : aclcheck_error(aclresult, OBJECT_DATABASE,
3317 2102 : 0 : get_database_name(MyDatabaseId));
2103 : :
157 akapila@postgresql.o 2104 [ + + ]:GNC 6 : if (!superuser_arg(newOwnerId))
2105 : : {
2106 [ + - + - : 6 : if (form->puballtables || form->puballsequences ||
+ - ]
2107 : 3 : is_schema_publication(form->oid))
2108 [ + - ]: 3 : ereport(ERROR,
2109 : : errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
2110 : : errmsg("permission denied to change owner of publication \"%s\"",
2111 : : NameStr(form->pubname)),
2112 : : errhint("The owner of a FOR ALL TABLES or ALL SEQUENCES or TABLES IN SCHEMA publication must be a superuser."));
2113 : : }
2114 : : }
2115 : :
3342 peter_e@gmx.net 2116 :CBC 9 : form->pubowner = newOwnerId;
3330 alvherre@alvh.no-ip. 2117 : 9 : CatalogTupleUpdate(rel, &tup->t_self, tup);
2118 : :
2119 : : /* Update owner dependency reference */
3342 peter_e@gmx.net 2120 : 9 : changeDependencyOnOwner(PublicationRelationId,
2121 : : form->oid,
2122 : : newOwnerId);
2123 : :
2124 [ - + ]: 9 : InvokeObjectPostAlterHook(PublicationRelationId,
2125 : : form->oid, 0);
2126 : : }
2127 : :
2128 : : /*
2129 : : * Change publication owner -- by name
2130 : : */
2131 : : ObjectAddress
2132 : 18 : AlterPublicationOwner(const char *name, Oid newOwnerId)
2133 : : {
2134 : : Oid pubid;
2135 : : HeapTuple tup;
2136 : : Relation rel;
2137 : : ObjectAddress address;
2138 : : Form_pg_publication pubform;
2139 : :
2610 andres@anarazel.de 2140 : 18 : rel = table_open(PublicationRelationId, RowExclusiveLock);
2141 : :
3342 peter_e@gmx.net 2142 : 18 : tup = SearchSysCacheCopy1(PUBLICATIONNAME, CStringGetDatum(name));
2143 : :
2144 [ - + ]: 18 : if (!HeapTupleIsValid(tup))
3342 peter_e@gmx.net 2145 [ # # ]:UBC 0 : ereport(ERROR,
2146 : : (errcode(ERRCODE_UNDEFINED_OBJECT),
2147 : : errmsg("publication \"%s\" does not exist", name)));
2148 : :
2672 andres@anarazel.de 2149 :CBC 18 : pubform = (Form_pg_publication) GETSTRUCT(tup);
362 akapila@postgresql.o 2150 : 18 : pubid = pubform->oid;
2151 : :
3342 peter_e@gmx.net 2152 : 18 : AlterPublicationOwner_internal(rel, tup, newOwnerId);
2153 : :
362 akapila@postgresql.o 2154 : 15 : ObjectAddressSet(address, PublicationRelationId, pubid);
2155 : :
3342 peter_e@gmx.net 2156 : 15 : heap_freetuple(tup);
2157 : :
2610 andres@anarazel.de 2158 : 15 : table_close(rel, RowExclusiveLock);
2159 : :
3342 peter_e@gmx.net 2160 : 15 : return address;
2161 : : }
2162 : :
2163 : : /*
2164 : : * Change publication owner -- by OID
2165 : : */
2166 : : void
362 akapila@postgresql.o 2167 :UBC 0 : AlterPublicationOwner_oid(Oid pubid, Oid newOwnerId)
2168 : : {
2169 : : HeapTuple tup;
2170 : : Relation rel;
2171 : :
2610 andres@anarazel.de 2172 : 0 : rel = table_open(PublicationRelationId, RowExclusiveLock);
2173 : :
362 akapila@postgresql.o 2174 : 0 : tup = SearchSysCacheCopy1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
2175 : :
3342 peter_e@gmx.net 2176 [ # # ]: 0 : if (!HeapTupleIsValid(tup))
2177 [ # # ]: 0 : ereport(ERROR,
2178 : : (errcode(ERRCODE_UNDEFINED_OBJECT),
2179 : : errmsg("publication with OID %u does not exist", pubid)));
2180 : :
2181 : 0 : AlterPublicationOwner_internal(rel, tup, newOwnerId);
2182 : :
2183 : 0 : heap_freetuple(tup);
2184 : :
2610 andres@anarazel.de 2185 : 0 : table_close(rel, RowExclusiveLock);
3342 peter_e@gmx.net 2186 : 0 : }
2187 : :
2188 : : /*
2189 : : * Extract the publish_generated_columns option value from a DefElem. "stored"
2190 : : * and "none" values are accepted.
2191 : : */
2192 : : static char
416 akapila@postgresql.o 2193 :CBC 38 : defGetGeneratedColsOption(DefElem *def)
2194 : : {
222 2195 : 38 : char *sval = "";
2196 : :
2197 : : /*
2198 : : * A parameter value is required.
2199 : : */
2200 [ + + ]: 38 : if (def->arg)
2201 : : {
2202 : 35 : sval = defGetString(def);
2203 : :
2204 [ + + ]: 35 : if (pg_strcasecmp(sval, "none") == 0)
2205 : 11 : return PUBLISH_GENCOLS_NONE;
2206 [ + + ]: 24 : if (pg_strcasecmp(sval, "stored") == 0)
2207 : 21 : return PUBLISH_GENCOLS_STORED;
2208 : : }
2209 : :
416 2210 [ + - ]: 6 : ereport(ERROR,
2211 : : errcode(ERRCODE_SYNTAX_ERROR),
2212 : : errmsg("invalid value for publication parameter \"%s\": \"%s\"", def->defname, sval),
2213 : : errdetail("Valid values are \"%s\" and \"%s\".", "none", "stored"));
2214 : :
2215 : : return PUBLISH_GENCOLS_NONE; /* keep compiler quiet */
2216 : : }
|