Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * publicationcmds.c
4 : : * publication manipulation
5 : : *
6 : : * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
7 : : * Portions Copyright (c) 1994, Regents of the University of California
8 : : *
9 : : * IDENTIFICATION
10 : : * src/backend/commands/publicationcmds.c
11 : : *
12 : : *-------------------------------------------------------------------------
13 : : */
14 : :
15 : : #include "postgres.h"
16 : :
17 : : #include "access/htup_details.h"
18 : : #include "access/table.h"
19 : : #include "access/xact.h"
20 : : #include "catalog/catalog.h"
21 : : #include "catalog/indexing.h"
22 : : #include "catalog/namespace.h"
23 : : #include "catalog/objectaccess.h"
24 : : #include "catalog/objectaddress.h"
25 : : #include "catalog/pg_database.h"
26 : : #include "catalog/pg_inherits.h"
27 : : #include "catalog/pg_namespace.h"
28 : : #include "catalog/pg_proc.h"
29 : : #include "catalog/pg_publication.h"
30 : : #include "catalog/pg_publication_namespace.h"
31 : : #include "catalog/pg_publication_rel.h"
32 : : #include "commands/defrem.h"
33 : : #include "commands/event_trigger.h"
34 : : #include "commands/publicationcmds.h"
35 : : #include "miscadmin.h"
36 : : #include "nodes/nodeFuncs.h"
37 : : #include "parser/parse_clause.h"
38 : : #include "parser/parse_collate.h"
39 : : #include "parser/parse_relation.h"
40 : : #include "rewrite/rewriteHandler.h"
41 : : #include "storage/lmgr.h"
42 : : #include "utils/acl.h"
43 : : #include "utils/builtins.h"
44 : : #include "utils/inval.h"
45 : : #include "utils/lsyscache.h"
46 : : #include "utils/rel.h"
47 : : #include "utils/syscache.h"
48 : : #include "utils/varlena.h"
49 : :
50 : :
51 : : /*
52 : : * Information used to validate the columns in the row filter expression. See
53 : : * contain_invalid_rfcolumn_walker for details.
54 : : */
55 : : typedef struct rf_context
56 : : {
57 : : Bitmapset *bms_replident; /* bitset of replica identity columns */
58 : : bool pubviaroot; /* true if we are validating the parent
59 : : * relation's row filter */
60 : : Oid relid; /* relid of the relation */
61 : : Oid parentid; /* relid of the parent relation */
62 : : } rf_context;
63 : :
64 : : static List *OpenTableList(List *tables);
65 : : static void CloseTableList(List *rels);
66 : : static void LockSchemaList(List *schemalist);
67 : : static void PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
68 : : AlterPublicationStmt *stmt);
69 : : static void PublicationDropTables(Oid pubid, List *rels, bool missing_ok);
70 : : static void PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists,
71 : : AlterPublicationStmt *stmt);
72 : : static void PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok);
73 : : static char defGetGeneratedColsOption(DefElem *def);
74 : :
75 : :
76 : : static void
1565 dean.a.rasheed@gmail 77 :CBC 503 : parse_publication_options(ParseState *pstate,
78 : : List *options,
79 : : bool *publish_given,
80 : : PublicationActions *pubactions,
81 : : bool *publish_via_partition_root_given,
82 : : bool *publish_via_partition_root,
83 : : bool *publish_generated_columns_given,
84 : : char *publish_generated_columns)
85 : : {
86 : : ListCell *lc;
87 : :
3090 peter_e@gmx.net 88 : 503 : *publish_given = false;
2028 peter@eisentraut.org 89 : 503 : *publish_via_partition_root_given = false;
354 akapila@postgresql.o 90 : 503 : *publish_generated_columns_given = false;
91 : :
92 : : /* defaults */
2028 peter@eisentraut.org 93 : 503 : pubactions->pubinsert = true;
94 : 503 : pubactions->pubupdate = true;
95 : 503 : pubactions->pubdelete = true;
96 : 503 : pubactions->pubtruncate = true;
97 : 503 : *publish_via_partition_root = false;
277 akapila@postgresql.o 98 : 503 : *publish_generated_columns = PUBLISH_GENCOLS_NONE;
99 : :
100 : : /* Parse options */
3085 bruce@momjian.us 101 [ + + + + : 685 : foreach(lc, options)
+ + ]
102 : : {
3203 peter_e@gmx.net 103 : 200 : DefElem *defel = (DefElem *) lfirst(lc);
104 : :
3090 105 [ + + ]: 200 : if (strcmp(defel->defname, "publish") == 0)
106 : : {
107 : : char *publish;
108 : : List *publish_list;
109 : : ListCell *lc2;
110 : :
111 [ - + ]: 70 : if (*publish_given)
1565 dean.a.rasheed@gmail 112 :UBC 0 : errorConflictingDefElem(defel, pstate);
113 : :
114 : : /*
115 : : * If publish option was given only the explicitly listed actions
116 : : * should be published.
117 : : */
2028 peter@eisentraut.org 118 :CBC 70 : pubactions->pubinsert = false;
119 : 70 : pubactions->pubupdate = false;
120 : 70 : pubactions->pubdelete = false;
121 : 70 : pubactions->pubtruncate = false;
122 : :
3090 peter_e@gmx.net 123 : 70 : *publish_given = true;
124 : 70 : publish = defGetString(defel);
125 : :
126 [ - + ]: 70 : if (!SplitIdentifierString(publish, ',', &publish_list))
3203 peter_e@gmx.net 127 [ # # ]:UBC 0 : ereport(ERROR,
128 : : (errcode(ERRCODE_SYNTAX_ERROR),
129 : : errmsg("invalid list syntax in parameter \"%s\"",
130 : : "publish")));
131 : :
132 : : /* Process the option list. */
1118 drowley@postgresql.o 133 [ + + + + :CBC 167 : foreach(lc2, publish_list)
+ + ]
134 : : {
135 : 100 : char *publish_opt = (char *) lfirst(lc2);
136 : :
3090 peter_e@gmx.net 137 [ + + ]: 100 : if (strcmp(publish_opt, "insert") == 0)
2028 peter@eisentraut.org 138 : 62 : pubactions->pubinsert = true;
3090 peter_e@gmx.net 139 [ + + ]: 38 : else if (strcmp(publish_opt, "update") == 0)
2028 peter@eisentraut.org 140 : 15 : pubactions->pubupdate = true;
3090 peter_e@gmx.net 141 [ + + ]: 23 : else if (strcmp(publish_opt, "delete") == 0)
2028 peter@eisentraut.org 142 : 10 : pubactions->pubdelete = true;
2760 peter_e@gmx.net 143 [ + + ]: 13 : else if (strcmp(publish_opt, "truncate") == 0)
2028 peter@eisentraut.org 144 : 10 : pubactions->pubtruncate = true;
145 : : else
3090 peter_e@gmx.net 146 [ + - ]: 3 : ereport(ERROR,
147 : : (errcode(ERRCODE_SYNTAX_ERROR),
148 : : errmsg("unrecognized value for publication option \"%s\": \"%s\"",
149 : : "publish", publish_opt)));
150 : : }
151 : : }
2028 peter@eisentraut.org 152 [ + + ]: 130 : else if (strcmp(defel->defname, "publish_via_partition_root") == 0)
153 : : {
154 [ + + ]: 86 : if (*publish_via_partition_root_given)
1565 dean.a.rasheed@gmail 155 : 3 : errorConflictingDefElem(defel, pstate);
2028 peter@eisentraut.org 156 : 83 : *publish_via_partition_root_given = true;
157 : 83 : *publish_via_partition_root = defGetBoolean(defel);
158 : : }
354 akapila@postgresql.o 159 [ + + ]: 44 : else if (strcmp(defel->defname, "publish_generated_columns") == 0)
160 : : {
161 [ + + ]: 41 : if (*publish_generated_columns_given)
162 : 3 : errorConflictingDefElem(defel, pstate);
163 : 38 : *publish_generated_columns_given = true;
277 164 : 38 : *publish_generated_columns = defGetGeneratedColsOption(defel);
165 : : }
166 : : else
3090 peter_e@gmx.net 167 [ + - ]: 3 : ereport(ERROR,
168 : : (errcode(ERRCODE_SYNTAX_ERROR),
169 : : errmsg("unrecognized publication parameter: \"%s\"", defel->defname)));
170 : : }
3203 171 : 485 : }
172 : :
173 : : /*
174 : : * Convert the PublicationObjSpecType list into schema oid list and
175 : : * PublicationTable list.
176 : : */
177 : : static void
1461 akapila@postgresql.o 178 : 848 : ObjectsInPublicationToOids(List *pubobjspec_list, ParseState *pstate,
179 : : List **rels, List **schemas)
180 : : {
181 : : ListCell *cell;
182 : : PublicationObjSpec *pubobj;
183 : :
184 [ + + ]: 848 : if (!pubobjspec_list)
185 : 52 : return;
186 : :
187 [ + - + + : 1689 : foreach(cell, pubobjspec_list)
+ + ]
188 : : {
189 : : Oid schemaid;
190 : : List *search_path;
191 : :
192 : 911 : pubobj = (PublicationObjSpec *) lfirst(cell);
193 : :
194 [ + + + - ]: 911 : switch (pubobj->pubobjtype)
195 : : {
196 : 713 : case PUBLICATIONOBJ_TABLE:
1299 tomas.vondra@postgre 197 : 713 : *rels = lappend(*rels, pubobj->pubtable);
1461 akapila@postgresql.o 198 : 713 : break;
1397 alvherre@alvh.no-ip. 199 : 186 : case PUBLICATIONOBJ_TABLES_IN_SCHEMA:
1461 akapila@postgresql.o 200 : 186 : schemaid = get_namespace_oid(pubobj->name, false);
201 : :
202 : : /* Filter out duplicates if user specifies "sch1, sch1" */
1299 tomas.vondra@postgre 203 : 171 : *schemas = list_append_unique_oid(*schemas, schemaid);
1461 akapila@postgresql.o 204 : 171 : break;
1397 alvherre@alvh.no-ip. 205 : 12 : case PUBLICATIONOBJ_TABLES_IN_CUR_SCHEMA:
1461 akapila@postgresql.o 206 : 12 : search_path = fetch_search_path(false);
207 [ + + ]: 12 : if (search_path == NIL) /* nothing valid in search_path? */
208 [ + - ]: 3 : ereport(ERROR,
209 : : errcode(ERRCODE_UNDEFINED_SCHEMA),
210 : : errmsg("no schema has been selected for CURRENT_SCHEMA"));
211 : :
212 : 9 : schemaid = linitial_oid(search_path);
213 : 9 : list_free(search_path);
214 : :
215 : : /* Filter out duplicates if user specifies "sch1, sch1" */
1299 tomas.vondra@postgre 216 : 9 : *schemas = list_append_unique_oid(*schemas, schemaid);
1461 akapila@postgresql.o 217 : 9 : break;
1461 akapila@postgresql.o 218 :UBC 0 : default:
219 : : /* shouldn't happen */
220 [ # # ]: 0 : elog(ERROR, "invalid publication object type %d", pubobj->pubobjtype);
221 : : break;
222 : : }
223 : : }
224 : : }
225 : :
226 : : /*
227 : : * Returns true if any of the columns used in the row filter WHERE expression is
228 : : * not part of REPLICA IDENTITY, false otherwise.
229 : : */
230 : : static bool
1343 akapila@postgresql.o 231 :CBC 129 : contain_invalid_rfcolumn_walker(Node *node, rf_context *context)
232 : : {
233 [ - + ]: 129 : if (node == NULL)
1343 akapila@postgresql.o 234 :UBC 0 : return false;
235 : :
1343 akapila@postgresql.o 236 [ + + ]:CBC 129 : if (IsA(node, Var))
237 : : {
238 : 52 : Var *var = (Var *) node;
239 : 52 : AttrNumber attnum = var->varattno;
240 : :
241 : : /*
242 : : * If pubviaroot is true, we are validating the row filter of the
243 : : * parent table, but the bitmap contains the replica identity
244 : : * information of the child table. So, get the column number of the
245 : : * child table as parent and child column order could be different.
246 : : */
247 [ + + ]: 52 : if (context->pubviaroot)
248 : : {
249 : 8 : char *colname = get_attname(context->parentid, attnum, false);
250 : :
251 : 8 : attnum = get_attnum(context->relid, colname);
252 : : }
253 : :
254 [ + + ]: 52 : if (!bms_is_member(attnum - FirstLowInvalidHeapAttributeNumber,
255 : 52 : context->bms_replident))
256 : 30 : return true;
257 : : }
258 : :
259 : 99 : return expression_tree_walker(node, contain_invalid_rfcolumn_walker,
260 : : context);
261 : : }
262 : :
263 : : /*
264 : : * Check if all columns referenced in the filter expression are part of the
265 : : * REPLICA IDENTITY index or not.
266 : : *
267 : : * Returns true if any invalid column is found.
268 : : */
269 : : bool
1311 tomas.vondra@postgre 270 : 362 : pub_rf_contains_invalid_column(Oid pubid, Relation relation, List *ancestors,
271 : : bool pubviaroot)
272 : : {
273 : : HeapTuple rftuple;
1343 akapila@postgresql.o 274 : 362 : Oid relid = RelationGetRelid(relation);
275 : 362 : Oid publish_as_relid = RelationGetRelid(relation);
276 : 362 : bool result = false;
277 : : Datum rfdatum;
278 : : bool rfisnull;
279 : :
280 : : /*
281 : : * FULL means all columns are in the REPLICA IDENTITY, so all columns are
282 : : * allowed in the row filter and we can skip the validation.
283 : : */
284 [ + + ]: 362 : if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
285 : 96 : return false;
286 : :
287 : : /*
288 : : * For a partition, if pubviaroot is true, find the topmost ancestor that
289 : : * is published via this publication as we need to use its row filter
290 : : * expression to filter the partition's changes.
291 : : *
292 : : * Note that even though the row filter used is for an ancestor, the
293 : : * REPLICA IDENTITY used will be for the actual child table.
294 : : */
295 [ + + + + ]: 266 : if (pubviaroot && relation->rd_rel->relispartition)
296 : : {
297 : : publish_as_relid
1321 tomas.vondra@postgre 298 : 54 : = GetTopMostAncestorInPublication(pubid, ancestors, NULL);
299 : :
1343 akapila@postgresql.o 300 [ + + ]: 54 : if (!OidIsValid(publish_as_relid))
301 : 3 : publish_as_relid = relid;
302 : : }
303 : :
304 : 266 : rftuple = SearchSysCache2(PUBLICATIONRELMAP,
305 : : ObjectIdGetDatum(publish_as_relid),
306 : : ObjectIdGetDatum(pubid));
307 : :
308 [ + + ]: 266 : if (!HeapTupleIsValid(rftuple))
309 : 28 : return false;
310 : :
311 : 238 : rfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
312 : : Anum_pg_publication_rel_prqual,
313 : : &rfisnull);
314 : :
315 [ + + ]: 238 : if (!rfisnull)
316 : : {
317 : 51 : rf_context context = {0};
318 : : Node *rfnode;
319 : 51 : Bitmapset *bms = NULL;
320 : :
321 : 51 : context.pubviaroot = pubviaroot;
322 : 51 : context.parentid = publish_as_relid;
323 : 51 : context.relid = relid;
324 : :
325 : : /* Remember columns that are part of the REPLICA IDENTITY */
326 : 51 : bms = RelationGetIndexAttrBitmap(relation,
327 : : INDEX_ATTR_BITMAP_IDENTITY_KEY);
328 : :
329 : 51 : context.bms_replident = bms;
330 : 51 : rfnode = stringToNode(TextDatumGetCString(rfdatum));
331 : 51 : result = contain_invalid_rfcolumn_walker(rfnode, &context);
332 : : }
333 : :
334 : 238 : ReleaseSysCache(rftuple);
335 : :
336 : 238 : return result;
337 : : }
338 : :
339 : : /*
340 : : * Check for invalid columns in the publication table definition.
341 : : *
342 : : * This function evaluates two conditions:
343 : : *
344 : : * 1. Ensures that all columns referenced in the REPLICA IDENTITY are covered
345 : : * by the column list. If any column is missing, *invalid_column_list is set
346 : : * to true.
347 : : * 2. Ensures that all the generated columns referenced in the REPLICA IDENTITY
348 : : * are published, either by being explicitly named in the column list or, if
349 : : * no column list is specified, by setting the option
350 : : * publish_generated_columns to stored. If any unpublished
351 : : * generated column is found, *invalid_gen_col is set to true.
352 : : *
353 : : * Returns true if any of the above conditions are not met.
354 : : */
355 : : bool
327 356 : 470 : pub_contains_invalid_column(Oid pubid, Relation relation, List *ancestors,
357 : : bool pubviaroot, char pubgencols_type,
358 : : bool *invalid_column_list,
359 : : bool *invalid_gen_col)
360 : : {
1311 tomas.vondra@postgre 361 : 470 : Oid relid = RelationGetRelid(relation);
362 : 470 : Oid publish_as_relid = RelationGetRelid(relation);
363 : : Bitmapset *idattrs;
327 akapila@postgresql.o 364 : 470 : Bitmapset *columns = NULL;
365 : 470 : TupleDesc desc = RelationGetDescr(relation);
366 : : Publication *pub;
367 : : int x;
368 : :
369 : 470 : *invalid_column_list = false;
370 : 470 : *invalid_gen_col = false;
371 : :
372 : : /*
373 : : * For a partition, if pubviaroot is true, find the topmost ancestor that
374 : : * is published via this publication as we need to use its column list for
375 : : * the changes.
376 : : *
377 : : * Note that even though the column list used is for an ancestor, the
378 : : * REPLICA IDENTITY used will be for the actual child table.
379 : : */
1311 tomas.vondra@postgre 380 [ + + + + ]: 470 : if (pubviaroot && relation->rd_rel->relispartition)
381 : : {
382 : 80 : publish_as_relid = GetTopMostAncestorInPublication(pubid, ancestors, NULL);
383 : :
384 [ + + ]: 80 : if (!OidIsValid(publish_as_relid))
385 : 18 : publish_as_relid = relid;
386 : : }
387 : :
388 : : /* Fetch the column list */
327 akapila@postgresql.o 389 : 470 : pub = GetPublication(pubid);
390 : 470 : check_and_fetch_column_list(pub, publish_as_relid, NULL, &columns);
391 : :
392 [ + + ]: 470 : if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
393 : : {
394 : : /* With REPLICA IDENTITY FULL, no column list is allowed. */
395 : 103 : *invalid_column_list = (columns != NULL);
396 : :
397 : : /*
398 : : * As we don't allow a column list with REPLICA IDENTITY FULL, the
399 : : * publish_generated_columns option must be set to stored if the table
400 : : * has any stored generated columns.
401 : : */
277 402 [ + + ]: 103 : if (pubgencols_type != PUBLISH_GENCOLS_STORED &&
327 403 [ + + ]: 97 : relation->rd_att->constr &&
404 [ + + ]: 41 : relation->rd_att->constr->has_generated_stored)
405 : 3 : *invalid_gen_col = true;
406 : :
407 : : /*
408 : : * Virtual generated columns are currently not supported for logical
409 : : * replication at all.
410 : : */
262 peter@eisentraut.org 411 [ + + ]: 103 : if (relation->rd_att->constr &&
412 [ + + ]: 47 : relation->rd_att->constr->has_generated_virtual)
413 : 6 : *invalid_gen_col = true;
414 : :
327 akapila@postgresql.o 415 [ + + - + ]: 103 : if (*invalid_gen_col && *invalid_column_list)
327 akapila@postgresql.o 416 :UBC 0 : return true;
417 : : }
418 : :
419 : : /* Remember columns that are part of the REPLICA IDENTITY */
327 akapila@postgresql.o 420 :CBC 470 : idattrs = RelationGetIndexAttrBitmap(relation,
421 : : INDEX_ATTR_BITMAP_IDENTITY_KEY);
422 : :
423 : : /*
424 : : * Attnums in the bitmap returned by RelationGetIndexAttrBitmap are offset
425 : : * (to handle system columns the usual way), while column list does not
426 : : * use offset, so we can't do bms_is_subset(). Instead, we have to loop
427 : : * over the idattrs and check all of them are in the list.
428 : : */
429 : 470 : x = -1;
430 [ + + ]: 800 : while ((x = bms_next_member(idattrs, x)) >= 0)
431 : : {
432 : 333 : AttrNumber attnum = (x + FirstLowInvalidHeapAttributeNumber);
433 : 333 : Form_pg_attribute att = TupleDescAttr(desc, attnum - 1);
434 : :
435 [ + + ]: 333 : if (columns == NULL)
436 : : {
437 : : /*
438 : : * The publish_generated_columns option must be set to stored if
439 : : * the REPLICA IDENTITY contains any stored generated column.
440 : : */
262 peter@eisentraut.org 441 [ + + + - ]: 228 : if (att->attgenerated == ATTRIBUTE_GENERATED_STORED && pubgencols_type != PUBLISH_GENCOLS_STORED)
442 : : {
443 : 3 : *invalid_gen_col = true;
444 : 3 : break;
445 : : }
446 : :
447 : : /*
448 : : * The equivalent setting for virtual generated columns does not
449 : : * exist yet.
450 : : */
451 [ - + ]: 225 : if (att->attgenerated == ATTRIBUTE_GENERATED_VIRTUAL)
452 : : {
327 akapila@postgresql.o 453 :UBC 0 : *invalid_gen_col = true;
454 : 0 : break;
455 : : }
456 : :
457 : : /* Skip validating the column list since it is not defined */
327 akapila@postgresql.o 458 :CBC 225 : continue;
459 : : }
460 : :
461 : : /*
462 : : * If pubviaroot is true, we are validating the column list of the
463 : : * parent table, but the bitmap contains the replica identity
464 : : * information of the child table. The parent/child attnums may not
465 : : * match, so translate them to the parent - get the attname from the
466 : : * child, and look it up in the parent.
467 : : */
468 [ + + ]: 105 : if (pubviaroot)
469 : : {
470 : : /* attribute name in the child table */
471 : 40 : char *colname = get_attname(relid, attnum, false);
472 : :
473 : : /*
474 : : * Determine the attnum for the attribute name in parent (we are
475 : : * using the column list defined on the parent).
476 : : */
477 : 40 : attnum = get_attnum(publish_as_relid, colname);
478 : : }
479 : :
480 : : /* replica identity column, not covered by the column list */
481 : 105 : *invalid_column_list |= !bms_is_member(attnum, columns);
482 : :
483 [ + + - + ]: 105 : if (*invalid_column_list && *invalid_gen_col)
327 akapila@postgresql.o 484 :UBC 0 : break;
485 : : }
486 : :
327 akapila@postgresql.o 487 :CBC 470 : bms_free(columns);
488 : 470 : bms_free(idattrs);
489 : :
490 [ + + + + ]: 470 : return *invalid_column_list || *invalid_gen_col;
491 : : }
492 : :
493 : : /*
494 : : * Invalidate entries in the RelationSyncCache for relations included in the
495 : : * specified publication, either via FOR TABLE or FOR TABLES IN SCHEMA.
496 : : *
497 : : * If 'puballtables' is true, invalidate all cache entries.
498 : : */
499 : : void
228 500 : 18 : InvalidatePubRelSyncCache(Oid pubid, bool puballtables)
501 : : {
502 [ + + ]: 18 : if (puballtables)
503 : : {
504 : 3 : CacheInvalidateRelSyncAll();
505 : : }
506 : : else
507 : : {
508 : 15 : List *relids = NIL;
509 : 15 : List *schemarelids = NIL;
510 : :
511 : : /*
512 : : * For partitioned tables, we must invalidate all partitions and
513 : : * itself. WAL records for INSERT/UPDATE/DELETE specify leaf tables as
514 : : * a target. However, WAL records for TRUNCATE specify both a root and
515 : : * its leaves.
516 : : */
517 : 15 : relids = GetPublicationRelations(pubid,
518 : : PUBLICATION_PART_ALL);
519 : 15 : schemarelids = GetAllSchemaPublicationRelations(pubid,
520 : : PUBLICATION_PART_ALL);
521 : :
522 : 15 : relids = list_concat_unique_oid(relids, schemarelids);
523 : :
524 : : /* Invalidate the relsyncache */
525 [ + + + + : 33 : foreach_oid(relid, relids)
+ + ]
526 : 3 : CacheInvalidateRelSync(relid);
527 : : }
528 : :
529 : 18 : return;
530 : : }
531 : :
532 : : /* check_functions_in_node callback */
533 : : static bool
1343 534 : 218 : contain_mutable_or_user_functions_checker(Oid func_id, void *context)
535 : : {
536 [ + + + + ]: 218 : return (func_volatile(func_id) != PROVOLATILE_IMMUTABLE ||
537 : : func_id >= FirstNormalObjectId);
538 : : }
539 : :
540 : : /*
541 : : * The row filter walker checks if the row filter expression is a "simple
542 : : * expression".
543 : : *
544 : : * It allows only simple or compound expressions such as:
545 : : * - (Var Op Const)
546 : : * - (Var Op Var)
547 : : * - (Var Op Const) AND/OR (Var Op Const)
548 : : * - etc
549 : : * (where Var is a column of the table this filter belongs to)
550 : : *
551 : : * The simple expression has the following restrictions:
552 : : * - User-defined operators are not allowed;
553 : : * - User-defined functions are not allowed;
554 : : * - User-defined types are not allowed;
555 : : * - User-defined collations are not allowed;
556 : : * - Non-immutable built-in functions are not allowed;
557 : : * - System columns are not allowed.
558 : : *
559 : : * NOTES
560 : : *
561 : : * We don't allow user-defined functions/operators/types/collations because
562 : : * (a) if a user drops a user-defined object used in a row filter expression or
563 : : * if there is any other error while using it, the logical decoding
564 : : * infrastructure won't be able to recover from such an error even if the
565 : : * object is recreated again because a historic snapshot is used to evaluate
566 : : * the row filter;
567 : : * (b) a user-defined function can be used to access tables that could have
568 : : * unpleasant results because a historic snapshot is used. That's why only
569 : : * immutable built-in functions are allowed in row filter expressions.
570 : : *
571 : : * We don't allow system columns because currently, we don't have that
572 : : * information in the tuple passed to downstream. Also, as we don't replicate
573 : : * those to subscribers, there doesn't seem to be a need for a filter on those
574 : : * columns.
575 : : *
576 : : * We can allow other node types after more analysis and testing.
577 : : */
578 : : static bool
579 : 720 : check_simple_rowfilter_expr_walker(Node *node, ParseState *pstate)
580 : : {
581 : 720 : char *errdetail_msg = NULL;
582 : :
583 [ + + ]: 720 : if (node == NULL)
584 : 3 : return false;
585 : :
586 [ + + + + : 717 : switch (nodeTag(node))
+ + ]
587 : : {
588 : 194 : case T_Var:
589 : : /* System columns are not allowed. */
590 [ + + ]: 194 : if (((Var *) node)->varattno < InvalidAttrNumber)
591 : 3 : errdetail_msg = _("System columns are not allowed.");
592 : 194 : break;
593 : 196 : case T_OpExpr:
594 : : case T_DistinctExpr:
595 : : case T_NullIfExpr:
596 : : /* OK, except user-defined operators are not allowed. */
597 [ + + ]: 196 : if (((OpExpr *) node)->opno >= FirstNormalObjectId)
598 : 3 : errdetail_msg = _("User-defined operators are not allowed.");
599 : 196 : break;
600 : 3 : case T_ScalarArrayOpExpr:
601 : : /* OK, except user-defined operators are not allowed. */
602 [ - + ]: 3 : if (((ScalarArrayOpExpr *) node)->opno >= FirstNormalObjectId)
1343 akapila@postgresql.o 603 :UBC 0 : errdetail_msg = _("User-defined operators are not allowed.");
604 : :
605 : : /*
606 : : * We don't need to check the hashfuncid and negfuncid of
607 : : * ScalarArrayOpExpr as those functions are only built for a
608 : : * subquery.
609 : : */
1343 akapila@postgresql.o 610 :CBC 3 : break;
611 : 3 : case T_RowCompareExpr:
612 : : {
613 : : ListCell *opid;
614 : :
615 : : /* OK, except user-defined operators are not allowed. */
616 [ + - + + : 9 : foreach(opid, ((RowCompareExpr *) node)->opnos)
+ + ]
617 : : {
618 [ - + ]: 6 : if (lfirst_oid(opid) >= FirstNormalObjectId)
619 : : {
1343 akapila@postgresql.o 620 :UBC 0 : errdetail_msg = _("User-defined operators are not allowed.");
621 : 0 : break;
622 : : }
623 : : }
624 : : }
1343 akapila@postgresql.o 625 :CBC 3 : break;
626 : 318 : case T_Const:
627 : : case T_FuncExpr:
628 : : case T_BoolExpr:
629 : : case T_RelabelType:
630 : : case T_CollateExpr:
631 : : case T_CaseExpr:
632 : : case T_CaseTestExpr:
633 : : case T_ArrayExpr:
634 : : case T_RowExpr:
635 : : case T_CoalesceExpr:
636 : : case T_MinMaxExpr:
637 : : case T_XmlExpr:
638 : : case T_NullTest:
639 : : case T_BooleanTest:
640 : : case T_List:
641 : : /* OK, supported */
642 : 318 : break;
643 : 3 : default:
1129 peter@eisentraut.org 644 : 3 : errdetail_msg = _("Only columns, constants, built-in operators, built-in data types, built-in collations, and immutable built-in functions are allowed.");
1343 akapila@postgresql.o 645 : 3 : break;
646 : : }
647 : :
648 : : /*
649 : : * For all the supported nodes, if we haven't already found a problem,
650 : : * check the types, functions, and collations used in it. We check List
651 : : * by walking through each element.
652 : : */
1125 alvherre@alvh.no-ip. 653 [ + + + + ]: 717 : if (!errdetail_msg && !IsA(node, List))
654 : : {
655 [ + + ]: 681 : if (exprType(node) >= FirstNormalObjectId)
656 : 3 : errdetail_msg = _("User-defined types are not allowed.");
657 [ + + ]: 678 : else if (check_functions_in_node(node, contain_mutable_or_user_functions_checker,
658 : : pstate))
659 : 6 : errdetail_msg = _("User-defined or built-in mutable functions are not allowed.");
660 [ + - + + ]: 1344 : else if (exprCollation(node) >= FirstNormalObjectId ||
661 : 672 : exprInputCollation(node) >= FirstNormalObjectId)
662 : 3 : errdetail_msg = _("User-defined collations are not allowed.");
663 : : }
664 : :
665 : : /*
666 : : * If we found a problem in this node, throw error now. Otherwise keep
667 : : * going.
668 : : */
1343 akapila@postgresql.o 669 [ + + ]: 717 : if (errdetail_msg)
670 [ + - ]: 21 : ereport(ERROR,
671 : : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
672 : : errmsg("invalid publication WHERE expression"),
673 : : errdetail_internal("%s", errdetail_msg),
674 : : parser_errposition(pstate, exprLocation(node))));
675 : :
676 : 696 : return expression_tree_walker(node, check_simple_rowfilter_expr_walker,
677 : : pstate);
678 : : }
679 : :
680 : : /*
681 : : * Check if the row filter expression is a "simple expression".
682 : : *
683 : : * See check_simple_rowfilter_expr_walker for details.
684 : : */
685 : : static bool
686 : 188 : check_simple_rowfilter_expr(Node *node, ParseState *pstate)
687 : : {
688 : 188 : return check_simple_rowfilter_expr_walker(node, pstate);
689 : : }
690 : :
691 : : /*
692 : : * Transform the publication WHERE expression for all the relations in the list,
693 : : * ensuring it is coerced to boolean and necessary collation information is
694 : : * added if required, and add a new nsitem/RTE for the associated relation to
695 : : * the ParseState's namespace list.
696 : : *
697 : : * Also check the publication row filter expression and throw an error if
698 : : * anything not permitted or unexpected is encountered.
699 : : */
700 : : static void
701 : 591 : TransformPubWhereClauses(List *tables, const char *queryString,
702 : : bool pubviaroot)
703 : : {
704 : : ListCell *lc;
705 : :
706 [ + + + + : 1189 : foreach(lc, tables)
+ + ]
707 : : {
708 : : ParseNamespaceItem *nsitem;
709 : 628 : Node *whereclause = NULL;
710 : : ParseState *pstate;
711 : 628 : PublicationRelInfo *pri = (PublicationRelInfo *) lfirst(lc);
712 : :
713 [ + + ]: 628 : if (pri->whereClause == NULL)
714 : 431 : continue;
715 : :
716 : : /*
717 : : * If the publication doesn't publish changes via the root partitioned
718 : : * table, the partition's row filter will be used. So disallow using
719 : : * WHERE clause on partitioned table in this case.
720 : : */
721 [ + + ]: 197 : if (!pubviaroot &&
722 [ + + ]: 182 : pri->relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
723 [ + - ]: 3 : ereport(ERROR,
724 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
725 : : errmsg("cannot use publication WHERE clause for relation \"%s\"",
726 : : RelationGetRelationName(pri->relation)),
727 : : errdetail("WHERE clause cannot be used for a partitioned table when %s is false.",
728 : : "publish_via_partition_root")));
729 : :
730 : : /*
731 : : * A fresh pstate is required so that we only have "this" table in its
732 : : * rangetable
733 : : */
734 : 194 : pstate = make_parsestate(NULL);
735 : 194 : pstate->p_sourcetext = queryString;
736 : 194 : nsitem = addRangeTableEntryForRelation(pstate, pri->relation,
737 : : AccessShareLock, NULL,
738 : : false, false);
739 : 194 : addNSItemToQuery(pstate, nsitem, false, true, true);
740 : :
741 : 194 : whereclause = transformWhereClause(pstate,
742 : 194 : copyObject(pri->whereClause),
743 : : EXPR_KIND_WHERE,
744 : : "PUBLICATION WHERE");
745 : :
746 : : /* Fix up collation information */
747 : 188 : assign_expr_collations(pstate, whereclause);
748 : :
262 peter@eisentraut.org 749 : 188 : whereclause = expand_generated_columns_in_expr(whereclause, pri->relation, 1);
750 : :
751 : : /*
752 : : * We allow only simple expressions in row filters. See
753 : : * check_simple_rowfilter_expr_walker.
754 : : */
1343 akapila@postgresql.o 755 : 188 : check_simple_rowfilter_expr(whereclause, pstate);
756 : :
757 : 167 : free_parsestate(pstate);
758 : :
759 : 167 : pri->whereClause = whereclause;
760 : : }
761 : 561 : }
762 : :
763 : :
764 : : /*
765 : : * Given a list of tables that are going to be added to a publication,
766 : : * verify that they fulfill the necessary preconditions, namely: no tables
767 : : * have a column list if any schema is published; and partitioned tables do
768 : : * not have column lists if publish_via_partition_root is not set.
769 : : *
770 : : * 'publish_schema' indicates that the publication contains any TABLES IN
771 : : * SCHEMA elements (newly added in this command, or preexisting).
772 : : * 'pubviaroot' is the value of publish_via_partition_root.
773 : : */
774 : : static void
1126 alvherre@alvh.no-ip. 775 : 561 : CheckPubRelationColumnList(char *pubname, List *tables,
776 : : bool publish_schema, bool pubviaroot)
777 : : {
778 : : ListCell *lc;
779 : :
1311 tomas.vondra@postgre 780 [ + + + + : 1144 : foreach(lc, tables)
+ + ]
781 : : {
782 : 598 : PublicationRelInfo *pri = (PublicationRelInfo *) lfirst(lc);
783 : :
784 [ + + ]: 598 : if (pri->columns == NIL)
785 : 396 : continue;
786 : :
787 : : /*
788 : : * Disallow specifying column list if any schema is in the
789 : : * publication.
790 : : *
791 : : * XXX We could instead just forbid the case when the publication
792 : : * tries to publish the table with a column list and a schema for that
793 : : * table. However, if we do that then we need a restriction during
794 : : * ALTER TABLE ... SET SCHEMA to prevent such a case which doesn't
795 : : * seem to be a good idea.
796 : : */
1130 akapila@postgresql.o 797 [ + + ]: 202 : if (publish_schema)
798 [ + - ]: 12 : ereport(ERROR,
799 : : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
800 : : errmsg("cannot use column list for relation \"%s.%s\" in publication \"%s\"",
801 : : get_namespace_name(RelationGetNamespace(pri->relation)),
802 : : RelationGetRelationName(pri->relation), pubname),
803 : : errdetail("Column lists cannot be specified in publications containing FOR TABLES IN SCHEMA elements."));
804 : :
805 : : /*
806 : : * If the publication doesn't publish changes via the root partitioned
807 : : * table, the partition's column list will be used. So disallow using
808 : : * a column list on the partitioned table in this case.
809 : : */
1311 tomas.vondra@postgre 810 [ + + ]: 190 : if (!pubviaroot &&
811 [ + + ]: 152 : pri->relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
812 [ + - ]: 3 : ereport(ERROR,
813 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
814 : : errmsg("cannot use column list for relation \"%s.%s\" in publication \"%s\"",
815 : : get_namespace_name(RelationGetNamespace(pri->relation)),
816 : : RelationGetRelationName(pri->relation), pubname),
817 : : errdetail("Column lists cannot be specified for partitioned tables when %s is false.",
818 : : "publish_via_partition_root")));
819 : : }
820 : 546 : }
821 : :
822 : : /*
823 : : * Create new publication.
824 : : */
825 : : ObjectAddress
1565 dean.a.rasheed@gmail 826 : 451 : CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt)
827 : : {
828 : : Relation rel;
829 : : ObjectAddress myself;
830 : : Oid puboid;
831 : : bool nulls[Natts_pg_publication];
832 : : Datum values[Natts_pg_publication];
833 : : HeapTuple tup;
834 : : bool publish_given;
835 : : PublicationActions pubactions;
836 : : bool publish_via_partition_root_given;
837 : : bool publish_via_partition_root;
838 : : bool publish_generated_columns_given;
839 : : char publish_generated_columns;
840 : : AclResult aclresult;
1299 tomas.vondra@postgre 841 : 451 : List *relations = NIL;
842 : 451 : List *schemaidlist = NIL;
843 : :
844 : : /* must have CREATE privilege on database */
1079 peter@eisentraut.org 845 : 451 : aclresult = object_aclcheck(DatabaseRelationId, MyDatabaseId, GetUserId(), ACL_CREATE);
3203 peter_e@gmx.net 846 [ + + ]: 451 : if (aclresult != ACLCHECK_OK)
2886 847 : 3 : aclcheck_error(aclresult, OBJECT_DATABASE,
3203 848 : 3 : get_database_name(MyDatabaseId));
849 : :
850 : : /* FOR ALL TABLES and FOR ALL SEQUENCES requires superuser */
18 akapila@postgresql.o 851 [ + + ]:GNC 448 : if (!superuser())
852 : : {
853 [ + - - + ]: 12 : if (stmt->for_all_tables || stmt->for_all_sequences)
18 akapila@postgresql.o 854 [ # # ]:UNC 0 : ereport(ERROR,
855 : : errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
856 : : errmsg("must be superuser to create a FOR ALL TABLES or ALL SEQUENCES publication"));
857 : : }
858 : :
2471 andres@anarazel.de 859 :CBC 448 : rel = table_open(PublicationRelationId, RowExclusiveLock);
860 : :
861 : : /* Check if name is used */
2533 862 : 448 : puboid = GetSysCacheOid1(PUBLICATIONNAME, Anum_pg_publication_oid,
863 : : CStringGetDatum(stmt->pubname));
3203 peter_e@gmx.net 864 [ + + ]: 448 : if (OidIsValid(puboid))
865 [ + - ]: 3 : ereport(ERROR,
866 : : (errcode(ERRCODE_DUPLICATE_OBJECT),
867 : : errmsg("publication \"%s\" already exists",
868 : : stmt->pubname)));
869 : :
870 : : /* Form a tuple. */
871 : 445 : memset(values, 0, sizeof(values));
872 : 445 : memset(nulls, false, sizeof(nulls));
873 : :
874 : 445 : values[Anum_pg_publication_pubname - 1] =
875 : 445 : DirectFunctionCall1(namein, CStringGetDatum(stmt->pubname));
876 : 445 : values[Anum_pg_publication_pubowner - 1] = ObjectIdGetDatum(GetUserId());
877 : :
1565 dean.a.rasheed@gmail 878 : 445 : parse_publication_options(pstate,
879 : : stmt->options,
880 : : &publish_given, &pubactions,
881 : : &publish_via_partition_root_given,
882 : : &publish_via_partition_root,
883 : : &publish_generated_columns_given,
884 : : &publish_generated_columns);
885 : :
18 akapila@postgresql.o 886 [ + + ]:GNC 427 : if (stmt->for_all_sequences &&
887 [ + + + - : 18 : (publish_given || publish_via_partition_root_given ||
+ + ]
888 : : publish_generated_columns_given))
889 [ + - ]: 7 : ereport(NOTICE,
890 : : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
891 : : errmsg("publication parameters are not applicable to sequence synchronization and will be ignored for sequences"));
892 : :
2533 andres@anarazel.de 893 :CBC 427 : puboid = GetNewOidWithIndex(rel, PublicationObjectIndexId,
894 : : Anum_pg_publication_oid);
895 : 427 : values[Anum_pg_publication_oid - 1] = ObjectIdGetDatum(puboid);
3203 peter_e@gmx.net 896 : 427 : values[Anum_pg_publication_puballtables - 1] =
1299 tomas.vondra@postgre 897 : 427 : BoolGetDatum(stmt->for_all_tables);
18 akapila@postgresql.o 898 :GNC 427 : values[Anum_pg_publication_puballsequences - 1] =
899 : 427 : BoolGetDatum(stmt->for_all_sequences);
3203 peter_e@gmx.net 900 :CBC 427 : values[Anum_pg_publication_pubinsert - 1] =
2028 peter@eisentraut.org 901 : 427 : BoolGetDatum(pubactions.pubinsert);
3203 peter_e@gmx.net 902 : 427 : values[Anum_pg_publication_pubupdate - 1] =
2028 peter@eisentraut.org 903 : 427 : BoolGetDatum(pubactions.pubupdate);
3203 peter_e@gmx.net 904 : 427 : values[Anum_pg_publication_pubdelete - 1] =
2028 peter@eisentraut.org 905 : 427 : BoolGetDatum(pubactions.pubdelete);
2760 peter_e@gmx.net 906 : 427 : values[Anum_pg_publication_pubtruncate - 1] =
2028 peter@eisentraut.org 907 : 427 : BoolGetDatum(pubactions.pubtruncate);
908 : 427 : values[Anum_pg_publication_pubviaroot - 1] =
909 : 427 : BoolGetDatum(publish_via_partition_root);
272 akapila@postgresql.o 910 : 427 : values[Anum_pg_publication_pubgencols - 1] =
277 911 : 427 : CharGetDatum(publish_generated_columns);
912 : :
3203 peter_e@gmx.net 913 : 427 : tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
914 : :
915 : : /* Insert tuple into catalog. */
2533 andres@anarazel.de 916 : 427 : CatalogTupleInsert(rel, tup);
3203 peter_e@gmx.net 917 : 427 : heap_freetuple(tup);
918 : :
3202 alvherre@alvh.no-ip. 919 : 427 : recordDependencyOnOwner(PublicationRelationId, puboid, GetUserId());
920 : :
3203 peter_e@gmx.net 921 : 427 : ObjectAddressSet(myself, PublicationRelationId, puboid);
922 : :
923 : : /* Make the changes visible. */
924 : 427 : CommandCounterIncrement();
925 : :
926 : : /* Associate objects with the publication. */
1299 tomas.vondra@postgre 927 [ + + ]: 427 : if (stmt->for_all_tables)
928 : : {
929 : : /*
930 : : * Invalidate relcache so that publication info is rebuilt. Sequences
931 : : * publication doesn't require invalidation, as replica identity
932 : : * checks don't apply to them.
933 : : */
1510 akapila@postgresql.o 934 : 57 : CacheInvalidateRelcacheAll();
935 : : }
18 akapila@postgresql.o 936 [ + + ]:GNC 370 : else if (!stmt->for_all_sequences)
937 : : {
1299 tomas.vondra@postgre 938 :CBC 359 : ObjectsInPublicationToOids(stmt->pubobjects, pstate, &relations,
939 : : &schemaidlist);
940 : :
941 : : /* FOR TABLES IN SCHEMA requires superuser */
1167 tgl@sss.pgh.pa.us 942 [ + + + + ]: 350 : if (schemaidlist != NIL && !superuser())
1461 akapila@postgresql.o 943 [ + - ]: 3 : ereport(ERROR,
944 : : errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
945 : : errmsg("must be superuser to create FOR TABLES IN SCHEMA publication"));
946 : :
1167 tgl@sss.pgh.pa.us 947 [ + + ]: 347 : if (relations != NIL)
948 : : {
949 : : List *rels;
950 : :
1299 tomas.vondra@postgre 951 : 235 : rels = OpenTableList(relations);
1343 akapila@postgresql.o 952 : 220 : TransformPubWhereClauses(rels, pstate->p_sourcetext,
953 : : publish_via_partition_root);
954 : :
1126 alvherre@alvh.no-ip. 955 : 208 : CheckPubRelationColumnList(stmt->pubname, rels,
956 : : schemaidlist != NIL,
957 : : publish_via_partition_root);
958 : :
1299 tomas.vondra@postgre 959 : 205 : PublicationAddTables(puboid, rels, true, NULL);
960 : 192 : CloseTableList(rels);
961 : : }
962 : :
1167 tgl@sss.pgh.pa.us 963 [ + + ]: 304 : if (schemaidlist != NIL)
964 : : {
965 : : /*
966 : : * Schema lock is held until the publication is created to prevent
967 : : * concurrent schema deletion.
968 : : */
1299 tomas.vondra@postgre 969 : 76 : LockSchemaList(schemaidlist);
970 : 76 : PublicationAddSchemas(puboid, schemaidlist, true, NULL);
971 : : }
972 : : }
973 : :
2471 andres@anarazel.de 974 : 369 : table_close(rel, RowExclusiveLock);
975 : :
3203 peter_e@gmx.net 976 [ - + ]: 369 : InvokeObjectPostCreateHook(PublicationRelationId, puboid, 0);
977 : :
2298 tmunro@postgresql.or 978 [ + + ]: 369 : if (wal_level != WAL_LEVEL_LOGICAL)
979 [ + - ]: 224 : ereport(WARNING,
980 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
981 : : errmsg("\"wal_level\" is insufficient to publish logical changes"),
982 : : errhint("Set \"wal_level\" to \"logical\" before creating subscriptions.")));
983 : :
3203 peter_e@gmx.net 984 : 369 : return myself;
985 : : }
986 : :
987 : : /*
988 : : * Change options of a publication.
989 : : */
990 : : static void
1565 dean.a.rasheed@gmail 991 : 58 : AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt,
992 : : Relation rel, HeapTuple tup)
993 : : {
994 : : bool nulls[Natts_pg_publication];
995 : : bool replaces[Natts_pg_publication];
996 : : Datum values[Natts_pg_publication];
997 : : bool publish_given;
998 : : PublicationActions pubactions;
999 : : bool publish_via_partition_root_given;
1000 : : bool publish_via_partition_root;
1001 : : bool publish_generated_columns_given;
1002 : : char publish_generated_columns;
1003 : : ObjectAddress obj;
1004 : : Form_pg_publication pubform;
1343 akapila@postgresql.o 1005 : 58 : List *root_relids = NIL;
1006 : : ListCell *lc;
1007 : :
18 akapila@postgresql.o 1008 :GNC 58 : pubform = (Form_pg_publication) GETSTRUCT(tup);
1009 : :
1565 dean.a.rasheed@gmail 1010 :CBC 58 : parse_publication_options(pstate,
1011 : : stmt->options,
1012 : : &publish_given, &pubactions,
1013 : : &publish_via_partition_root_given,
1014 : : &publish_via_partition_root,
1015 : : &publish_generated_columns_given,
1016 : : &publish_generated_columns);
1017 : :
18 akapila@postgresql.o 1018 [ - + ]:GNC 58 : if (pubform->puballsequences &&
18 akapila@postgresql.o 1019 [ # # # # :UNC 0 : (publish_given || publish_via_partition_root_given ||
# # ]
1020 : : publish_generated_columns_given))
1021 [ # # ]: 0 : ereport(NOTICE,
1022 : : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1023 : : errmsg("publication parameters are not applicable to sequence synchronization and will be ignored for sequences"));
1024 : :
1025 : : /*
1026 : : * If the publication doesn't publish changes via the root partitioned
1027 : : * table, the partition's row filter and column list will be used. So
1028 : : * disallow using WHERE clause and column lists on partitioned table in
1029 : : * this case.
1030 : : */
1343 akapila@postgresql.o 1031 [ + + + + ]:CBC 58 : if (!pubform->puballtables && publish_via_partition_root_given &&
1032 [ + + ]: 40 : !publish_via_partition_root)
1033 : : {
1034 : : /*
1035 : : * Lock the publication so nobody else can do anything with it. This
1036 : : * prevents concurrent alter to add partitioned table(s) with WHERE
1037 : : * clause(s) and/or column lists which we don't allow when not
1038 : : * publishing via root.
1039 : : */
1040 : 24 : LockDatabaseObject(PublicationRelationId, pubform->oid, 0,
1041 : : AccessShareLock);
1042 : :
1043 : 24 : root_relids = GetPublicationRelations(pubform->oid,
1044 : : PUBLICATION_PART_ROOT);
1045 : :
1046 [ + - + + : 42 : foreach(lc, root_relids)
+ + ]
1047 : : {
1048 : 24 : Oid relid = lfirst_oid(lc);
1049 : : HeapTuple rftuple;
1050 : : char relkind;
1051 : : char *relname;
1052 : : bool has_rowfilter;
1053 : : bool has_collist;
1054 : :
1055 : : /*
1056 : : * Beware: we don't have lock on the relations, so cope silently
1057 : : * with the cache lookups returning NULL.
1058 : : */
1059 : :
1060 : 24 : rftuple = SearchSysCache2(PUBLICATIONRELMAP,
1061 : : ObjectIdGetDatum(relid),
1062 : : ObjectIdGetDatum(pubform->oid));
1293 alvherre@alvh.no-ip. 1063 [ - + ]: 24 : if (!HeapTupleIsValid(rftuple))
1293 alvherre@alvh.no-ip. 1064 :UBC 0 : continue;
1293 alvherre@alvh.no-ip. 1065 :CBC 24 : has_rowfilter = !heap_attisnull(rftuple, Anum_pg_publication_rel_prqual, NULL);
1066 : 24 : has_collist = !heap_attisnull(rftuple, Anum_pg_publication_rel_prattrs, NULL);
1067 [ + + + + ]: 24 : if (!has_rowfilter && !has_collist)
1068 : : {
1069 : 6 : ReleaseSysCache(rftuple);
1070 : 6 : continue;
1071 : : }
1072 : :
1073 : 18 : relkind = get_rel_relkind(relid);
1074 [ + + ]: 18 : if (relkind != RELKIND_PARTITIONED_TABLE)
1075 : : {
1076 : 12 : ReleaseSysCache(rftuple);
1077 : 12 : continue;
1078 : : }
1079 : 6 : relname = get_rel_name(relid);
1080 [ - + ]: 6 : if (relname == NULL) /* table concurrently dropped */
1081 : : {
1343 akapila@postgresql.o 1082 :UBC 0 : ReleaseSysCache(rftuple);
1293 alvherre@alvh.no-ip. 1083 : 0 : continue;
1084 : : }
1085 : :
1293 alvherre@alvh.no-ip. 1086 [ + + ]:CBC 6 : if (has_rowfilter)
1087 [ + - ]: 3 : ereport(ERROR,
1088 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1089 : : errmsg("cannot set parameter \"%s\" to false for publication \"%s\"",
1090 : : "publish_via_partition_root",
1091 : : stmt->pubname),
1092 : : errdetail("The publication contains a WHERE clause for partitioned table \"%s\", which is not allowed when \"%s\" is false.",
1093 : : relname, "publish_via_partition_root")));
1094 [ - + ]: 3 : Assert(has_collist);
1095 [ + - ]: 3 : ereport(ERROR,
1096 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1097 : : errmsg("cannot set parameter \"%s\" to false for publication \"%s\"",
1098 : : "publish_via_partition_root",
1099 : : stmt->pubname),
1100 : : errdetail("The publication contains a column list for partitioned table \"%s\", which is not allowed when \"%s\" is false.",
1101 : : relname, "publish_via_partition_root")));
1102 : : }
1103 : : }
1104 : :
1105 : : /* Everything ok, form a new tuple. */
3203 peter_e@gmx.net 1106 : 52 : memset(values, 0, sizeof(values));
1107 : 52 : memset(nulls, false, sizeof(nulls));
1108 : 52 : memset(replaces, false, sizeof(replaces));
1109 : :
3090 1110 [ + + ]: 52 : if (publish_given)
1111 : : {
2028 peter@eisentraut.org 1112 : 14 : values[Anum_pg_publication_pubinsert - 1] = BoolGetDatum(pubactions.pubinsert);
3203 peter_e@gmx.net 1113 : 14 : replaces[Anum_pg_publication_pubinsert - 1] = true;
1114 : :
2028 peter@eisentraut.org 1115 : 14 : values[Anum_pg_publication_pubupdate - 1] = BoolGetDatum(pubactions.pubupdate);
3203 peter_e@gmx.net 1116 : 14 : replaces[Anum_pg_publication_pubupdate - 1] = true;
1117 : :
2028 peter@eisentraut.org 1118 : 14 : values[Anum_pg_publication_pubdelete - 1] = BoolGetDatum(pubactions.pubdelete);
3203 peter_e@gmx.net 1119 : 14 : replaces[Anum_pg_publication_pubdelete - 1] = true;
1120 : :
2028 peter@eisentraut.org 1121 : 14 : values[Anum_pg_publication_pubtruncate - 1] = BoolGetDatum(pubactions.pubtruncate);
2760 peter_e@gmx.net 1122 : 14 : replaces[Anum_pg_publication_pubtruncate - 1] = true;
1123 : : }
1124 : :
2028 peter@eisentraut.org 1125 [ + + ]: 52 : if (publish_via_partition_root_given)
1126 : : {
1127 : 35 : values[Anum_pg_publication_pubviaroot - 1] = BoolGetDatum(publish_via_partition_root);
1128 : 35 : replaces[Anum_pg_publication_pubviaroot - 1] = true;
1129 : : }
1130 : :
354 akapila@postgresql.o 1131 [ + + ]: 52 : if (publish_generated_columns_given)
1132 : : {
272 1133 : 3 : values[Anum_pg_publication_pubgencols - 1] = CharGetDatum(publish_generated_columns);
1134 : 3 : replaces[Anum_pg_publication_pubgencols - 1] = true;
1135 : : }
1136 : :
3203 peter_e@gmx.net 1137 : 52 : tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
1138 : : replaces);
1139 : :
1140 : : /* Update the catalog. */
3191 alvherre@alvh.no-ip. 1141 : 52 : CatalogTupleUpdate(rel, &tup->t_self, tup);
1142 : :
3203 peter_e@gmx.net 1143 : 52 : CommandCounterIncrement();
1144 : :
2533 andres@anarazel.de 1145 : 52 : pubform = (Form_pg_publication) GETSTRUCT(tup);
1146 : :
1147 : : /* Invalidate the relcache. */
1299 tomas.vondra@postgre 1148 [ + + ]: 52 : if (pubform->puballtables)
1149 : : {
3203 peter_e@gmx.net 1150 : 4 : CacheInvalidateRelcacheAll();
1151 : : }
1152 : : else
1153 : : {
1461 akapila@postgresql.o 1154 : 48 : List *relids = NIL;
1155 : 48 : List *schemarelids = NIL;
1156 : :
1157 : : /*
1158 : : * For any partitioned tables contained in the publication, we must
1159 : : * invalidate all partitions contained in the respective partition
1160 : : * trees, not just those explicitly mentioned in the publication.
1161 : : */
1343 1162 [ + + ]: 48 : if (root_relids == NIL)
1163 : 30 : relids = GetPublicationRelations(pubform->oid,
1164 : : PUBLICATION_PART_ALL);
1165 : : else
1166 : : {
1167 : : /*
1168 : : * We already got tables explicitly mentioned in the publication.
1169 : : * Now get all partitions for the partitioned table in the list.
1170 : : */
1171 [ + - + + : 36 : foreach(lc, root_relids)
+ + ]
1172 : 18 : relids = GetPubPartitionOptionRelations(relids,
1173 : : PUBLICATION_PART_ALL,
1174 : : lfirst_oid(lc));
1175 : : }
1176 : :
1461 1177 : 48 : schemarelids = GetAllSchemaPublicationRelations(pubform->oid,
1178 : : PUBLICATION_PART_ALL);
1179 : 48 : relids = list_concat_unique_oid(relids, schemarelids);
1180 : :
1496 1181 : 48 : InvalidatePublicationRels(relids);
1182 : : }
1183 : :
2533 andres@anarazel.de 1184 : 52 : ObjectAddressSet(obj, PublicationRelationId, pubform->oid);
3203 peter_e@gmx.net 1185 : 52 : EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
1186 : : (Node *) stmt);
1187 : :
2533 andres@anarazel.de 1188 [ - + ]: 52 : InvokeObjectPostAlterHook(PublicationRelationId, pubform->oid, 0);
3203 peter_e@gmx.net 1189 : 52 : }
1190 : :
1191 : : /*
1192 : : * Invalidate the relations.
1193 : : */
1194 : : void
1496 akapila@postgresql.o 1195 : 1221 : InvalidatePublicationRels(List *relids)
1196 : : {
1197 : : /*
1198 : : * We don't want to send too many individual messages, at some point it's
1199 : : * cheaper to just reset whole relcache.
1200 : : */
1201 [ + - ]: 1221 : if (list_length(relids) < MAX_RELCACHE_INVAL_MSGS)
1202 : : {
1203 : : ListCell *lc;
1204 : :
1205 [ + + + + : 10337 : foreach(lc, relids)
+ + ]
1206 : 9116 : CacheInvalidateRelcacheByRelid(lfirst_oid(lc));
1207 : : }
1208 : : else
1496 akapila@postgresql.o 1209 :UBC 0 : CacheInvalidateRelcacheAll();
1496 akapila@postgresql.o 1210 :CBC 1221 : }
1211 : :
1212 : : /*
1213 : : * Add or remove table to/from publication.
1214 : : */
1215 : : static void
1461 1216 : 459 : AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup,
1217 : : List *tables, const char *queryString,
1218 : : bool publish_schema)
1219 : : {
3203 peter_e@gmx.net 1220 : 459 : List *rels = NIL;
1221 : 459 : Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
2533 andres@anarazel.de 1222 : 459 : Oid pubid = pubform->oid;
1223 : :
1224 : : /*
1225 : : * Nothing to do if no objects, except in SET: for that it is quite
1226 : : * possible that user has not specified any tables in which case we need
1227 : : * to remove all the existing tables.
1228 : : */
1393 alvherre@alvh.no-ip. 1229 [ + + + + ]: 459 : if (!tables && stmt->action != AP_SetObjects)
1461 akapila@postgresql.o 1230 : 38 : return;
1231 : :
1299 tomas.vondra@postgre 1232 : 421 : rels = OpenTableList(tables);
1233 : :
1393 alvherre@alvh.no-ip. 1234 [ + + ]: 421 : if (stmt->action == AP_AddObjects)
1235 : : {
1343 akapila@postgresql.o 1236 : 148 : TransformPubWhereClauses(rels, queryString, pubform->pubviaroot);
1237 : :
1130 1238 : 139 : publish_schema |= is_schema_publication(pubid);
1239 : :
1126 alvherre@alvh.no-ip. 1240 : 139 : CheckPubRelationColumnList(stmt->pubname, rels, publish_schema,
1130 akapila@postgresql.o 1241 : 139 : pubform->pubviaroot);
1242 : :
1299 tomas.vondra@postgre 1243 : 133 : PublicationAddTables(pubid, rels, false, stmt);
1244 : : }
1393 alvherre@alvh.no-ip. 1245 [ + + ]: 273 : else if (stmt->action == AP_DropObjects)
1299 tomas.vondra@postgre 1246 : 50 : PublicationDropTables(pubid, rels, false);
1247 : : else /* AP_SetObjects */
1248 : : {
1992 tgl@sss.pgh.pa.us 1249 : 223 : List *oldrelids = GetPublicationRelations(pubid,
1250 : : PUBLICATION_PART_ROOT);
3203 peter_e@gmx.net 1251 : 223 : List *delrels = NIL;
1252 : : ListCell *oldlc;
1253 : :
1343 akapila@postgresql.o 1254 : 223 : TransformPubWhereClauses(rels, queryString, pubform->pubviaroot);
1255 : :
1126 alvherre@alvh.no-ip. 1256 : 214 : CheckPubRelationColumnList(stmt->pubname, rels, publish_schema,
1130 akapila@postgresql.o 1257 : 214 : pubform->pubviaroot);
1258 : :
1259 : : /*
1260 : : * To recreate the relation list for the publication, look for
1261 : : * existing relations that do not need to be dropped.
1262 : : */
3203 peter_e@gmx.net 1263 [ + + + + : 403 : foreach(oldlc, oldrelids)
+ + ]
1264 : : {
1265 : 201 : Oid oldrelid = lfirst_oid(oldlc);
1266 : : ListCell *newlc;
1267 : : PublicationRelInfo *oldrel;
1268 : 201 : bool found = false;
1269 : : HeapTuple rftuple;
1343 akapila@postgresql.o 1270 : 201 : Node *oldrelwhereclause = NULL;
1311 tomas.vondra@postgre 1271 : 201 : Bitmapset *oldcolumns = NULL;
1272 : :
1273 : : /* look up the cache for the old relmap */
1343 akapila@postgresql.o 1274 : 201 : rftuple = SearchSysCache2(PUBLICATIONRELMAP,
1275 : : ObjectIdGetDatum(oldrelid),
1276 : : ObjectIdGetDatum(pubid));
1277 : :
1278 : : /*
1279 : : * See if the existing relation currently has a WHERE clause or a
1280 : : * column list. We need to compare those too.
1281 : : */
1282 [ + - ]: 201 : if (HeapTupleIsValid(rftuple))
1283 : : {
1311 tomas.vondra@postgre 1284 : 201 : bool isnull = true;
1285 : : Datum whereClauseDatum;
1286 : : Datum columnListDatum;
1287 : :
1288 : : /* Load the WHERE clause for this table. */
1343 akapila@postgresql.o 1289 : 201 : whereClauseDatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
1290 : : Anum_pg_publication_rel_prqual,
1291 : : &isnull);
1311 tomas.vondra@postgre 1292 [ + + ]: 201 : if (!isnull)
1343 akapila@postgresql.o 1293 : 105 : oldrelwhereclause = stringToNode(TextDatumGetCString(whereClauseDatum));
1294 : :
1295 : : /* Transform the int2vector column list to a bitmap. */
1311 tomas.vondra@postgre 1296 : 201 : columnListDatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
1297 : : Anum_pg_publication_rel_prattrs,
1298 : : &isnull);
1299 : :
1300 [ + + ]: 201 : if (!isnull)
1301 : 68 : oldcolumns = pub_collist_to_bitmapset(NULL, columnListDatum, NULL);
1302 : :
1343 akapila@postgresql.o 1303 : 201 : ReleaseSysCache(rftuple);
1304 : : }
1305 : :
3203 peter_e@gmx.net 1306 [ + + + + : 389 : foreach(newlc, rels)
+ + ]
1307 : : {
1308 : : PublicationRelInfo *newpubrel;
1309 : : Oid newrelid;
1264 tgl@sss.pgh.pa.us 1310 : 202 : Bitmapset *newcolumns = NULL;
1311 : :
1512 alvherre@alvh.no-ip. 1312 : 202 : newpubrel = (PublicationRelInfo *) lfirst(newlc);
1311 tomas.vondra@postgre 1313 : 202 : newrelid = RelationGetRelid(newpubrel->relation);
1314 : :
1315 : : /*
1316 : : * Validate the column list. If the column list or WHERE
1317 : : * clause changes, then the validation done here will be
1318 : : * duplicated inside PublicationAddTables(). The validation
1319 : : * is cheap enough that that seems harmless.
1320 : : */
438 drowley@postgresql.o 1321 : 202 : newcolumns = pub_collist_validate(newpubrel->relation,
1322 : : newpubrel->columns);
1323 : :
1324 : : /*
1325 : : * Check if any of the new set of relations matches with the
1326 : : * existing relations in the publication. Additionally, if the
1327 : : * relation has an associated WHERE clause, check the WHERE
1328 : : * expressions also match. Same for the column list. Drop the
1329 : : * rest.
1330 : : */
1331 [ + + ]: 196 : if (newrelid == oldrelid)
1332 : : {
1311 tomas.vondra@postgre 1333 [ + + + + ]: 142 : if (equal(oldrelwhereclause, newpubrel->whereClause) &&
1334 : 40 : bms_equal(oldcolumns, newcolumns))
1335 : : {
1343 akapila@postgresql.o 1336 : 8 : found = true;
1337 : 8 : break;
1338 : : }
1339 : : }
1340 : : }
1341 : :
1342 : : /*
1343 : : * Add the non-matched relations to a list so that they can be
1344 : : * dropped.
1345 : : */
1346 [ + + ]: 195 : if (!found)
1347 : : {
1348 : 187 : oldrel = palloc(sizeof(PublicationRelInfo));
1349 : 187 : oldrel->whereClause = NULL;
1311 tomas.vondra@postgre 1350 : 187 : oldrel->columns = NIL;
1343 akapila@postgresql.o 1351 : 187 : oldrel->relation = table_open(oldrelid,
1352 : : ShareUpdateExclusiveLock);
1353 : 187 : delrels = lappend(delrels, oldrel);
1354 : : }
1355 : : }
1356 : :
1357 : : /* And drop them. */
1299 tomas.vondra@postgre 1358 : 202 : PublicationDropTables(pubid, delrels, true);
1359 : :
1360 : : /*
1361 : : * Don't bother calculating the difference for adding, we'll catch and
1362 : : * skip existing ones when doing catalog update.
1363 : : */
1364 : 202 : PublicationAddTables(pubid, rels, true, stmt);
1365 : :
1366 : 202 : CloseTableList(delrels);
1367 : : }
1368 : :
1369 : 355 : CloseTableList(rels);
1370 : : }
1371 : :
1372 : : /*
1373 : : * Alter the publication schemas.
1374 : : *
1375 : : * Add or remove schemas to/from publication.
1376 : : */
1377 : : static void
1461 akapila@postgresql.o 1378 : 393 : AlterPublicationSchemas(AlterPublicationStmt *stmt,
1379 : : HeapTuple tup, List *schemaidlist)
1380 : : {
1381 : 393 : Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
1382 : :
1383 : : /*
1384 : : * Nothing to do if no objects, except in SET: for that it is quite
1385 : : * possible that user has not specified any schemas in which case we need
1386 : : * to remove all the existing schemas.
1387 : : */
1393 alvherre@alvh.no-ip. 1388 [ + + + + ]: 393 : if (!schemaidlist && stmt->action != AP_SetObjects)
1461 akapila@postgresql.o 1389 : 153 : return;
1390 : :
1391 : : /*
1392 : : * Schema lock is held until the publication is altered to prevent
1393 : : * concurrent schema deletion.
1394 : : */
1395 : 240 : LockSchemaList(schemaidlist);
1393 alvherre@alvh.no-ip. 1396 [ + + ]: 240 : if (stmt->action == AP_AddObjects)
1397 : : {
1398 : : ListCell *lc;
1399 : : List *reloids;
1400 : :
1299 tomas.vondra@postgre 1401 : 19 : reloids = GetPublicationRelations(pubform->oid, PUBLICATION_PART_ROOT);
1402 : :
1130 akapila@postgresql.o 1403 [ + + + + : 28 : foreach(lc, reloids)
+ + ]
1404 : : {
1405 : : HeapTuple coltuple;
1406 : :
1407 : 12 : coltuple = SearchSysCache2(PUBLICATIONRELMAP,
1408 : : ObjectIdGetDatum(lfirst_oid(lc)),
1409 : : ObjectIdGetDatum(pubform->oid));
1410 : :
1411 [ - + ]: 12 : if (!HeapTupleIsValid(coltuple))
1130 akapila@postgresql.o 1412 :UBC 0 : continue;
1413 : :
1414 : : /*
1415 : : * Disallow adding schema if column list is already part of the
1416 : : * publication. See CheckPubRelationColumnList.
1417 : : */
1130 akapila@postgresql.o 1418 [ + + ]:CBC 12 : if (!heap_attisnull(coltuple, Anum_pg_publication_rel_prattrs, NULL))
1419 [ + - ]: 3 : ereport(ERROR,
1420 : : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1421 : : errmsg("cannot add schema to publication \"%s\"",
1422 : : stmt->pubname),
1423 : : errdetail("Schemas cannot be added if any tables that specify a column list are already part of the publication."));
1424 : :
1425 : 9 : ReleaseSysCache(coltuple);
1426 : : }
1427 : :
1299 tomas.vondra@postgre 1428 : 16 : PublicationAddSchemas(pubform->oid, schemaidlist, false, stmt);
1429 : : }
1393 alvherre@alvh.no-ip. 1430 [ + + ]: 221 : else if (stmt->action == AP_DropObjects)
1299 tomas.vondra@postgre 1431 : 19 : PublicationDropSchemas(pubform->oid, schemaidlist, false);
1432 : : else /* AP_SetObjects */
1433 : : {
1434 : 202 : List *oldschemaids = GetPublicationSchemas(pubform->oid);
1461 akapila@postgresql.o 1435 : 202 : List *delschemas = NIL;
1436 : :
1437 : : /* Identify which schemas should be dropped */
1438 : 202 : delschemas = list_difference_oid(oldschemaids, schemaidlist);
1439 : :
1440 : : /*
1441 : : * Schema lock is held until the publication is altered to prevent
1442 : : * concurrent schema deletion.
1443 : : */
1444 : 202 : LockSchemaList(delschemas);
1445 : :
1446 : : /* And drop them */
1299 tomas.vondra@postgre 1447 : 202 : PublicationDropSchemas(pubform->oid, delschemas, true);
1448 : :
1449 : : /*
1450 : : * Don't bother calculating the difference for adding, we'll catch and
1451 : : * skip existing ones when doing catalog update.
1452 : : */
1453 : 202 : PublicationAddSchemas(pubform->oid, schemaidlist, true, stmt);
1454 : : }
1455 : : }
1456 : :
1457 : : /*
1458 : : * Check if relations and schemas can be in a given publication and throw
1459 : : * appropriate error if not.
1460 : : */
1461 : : static void
1461 akapila@postgresql.o 1462 : 480 : CheckAlterPublication(AlterPublicationStmt *stmt, HeapTuple tup,
1463 : : List *tables, List *schemaidlist)
1464 : : {
1465 : 480 : Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
1466 : :
1393 alvherre@alvh.no-ip. 1467 [ + + + + : 480 : if ((stmt->action == AP_AddObjects || stmt->action == AP_SetObjects) &&
+ + ]
1299 tomas.vondra@postgre 1468 [ + + ]: 52 : schemaidlist && !superuser())
1461 akapila@postgresql.o 1469 [ + - ]: 3 : ereport(ERROR,
1470 : : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1471 : : errmsg("must be superuser to add or set schemas")));
1472 : :
1473 : : /*
1474 : : * Check that user is allowed to manipulate the publication tables in
1475 : : * schema
1476 : : */
18 akapila@postgresql.o 1477 [ + + + + :GNC 477 : if (schemaidlist && (pubform->puballtables || pubform->puballsequences))
- + ]
1478 : : {
1479 [ + - - + ]: 9 : if (pubform->puballtables && pubform->puballsequences)
18 akapila@postgresql.o 1480 [ # # ]:UNC 0 : ereport(ERROR,
1481 : : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1482 : : errmsg("publication \"%s\" is defined as FOR ALL TABLES, ALL SEQUENCES",
1483 : : NameStr(pubform->pubname)),
1484 : : errdetail("Schemas cannot be added to or dropped from FOR ALL TABLES, ALL SEQUENCES publications."));
18 akapila@postgresql.o 1485 [ + - ]:GNC 9 : else if (pubform->puballtables)
1486 [ + - ]: 9 : ereport(ERROR,
1487 : : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1488 : : errmsg("publication \"%s\" is defined as FOR ALL TABLES",
1489 : : NameStr(pubform->pubname)),
1490 : : errdetail("Schemas cannot be added to or dropped from FOR ALL TABLES publications."));
1491 : : else
18 akapila@postgresql.o 1492 [ # # ]:UNC 0 : ereport(ERROR,
1493 : : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1494 : : errmsg("publication \"%s\" is defined as FOR ALL SEQUENCES",
1495 : : NameStr(pubform->pubname)),
1496 : : errdetail("Schemas cannot be added to or dropped from FOR ALL SEQUENCES publications."));
1497 : : }
1498 : :
1499 : : /* Check that user is allowed to manipulate the publication tables. */
18 akapila@postgresql.o 1500 [ + + + + :GNC 468 : if (tables && (pubform->puballtables || pubform->puballsequences))
- + ]
1501 : : {
1502 [ + - - + ]: 9 : if (pubform->puballtables && pubform->puballsequences)
18 akapila@postgresql.o 1503 [ # # ]:UNC 0 : ereport(ERROR,
1504 : : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1505 : : errmsg("publication \"%s\" is defined as FOR ALL TABLES, ALL SEQUENCES",
1506 : : NameStr(pubform->pubname)),
1507 : : errdetail("Tables or sequences cannot be added to or dropped from FOR ALL TABLES, ALL SEQUENCES publications."));
18 akapila@postgresql.o 1508 [ + - ]:GNC 9 : else if (pubform->puballtables)
1509 [ + - ]: 9 : ereport(ERROR,
1510 : : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1511 : : errmsg("publication \"%s\" is defined as FOR ALL TABLES",
1512 : : NameStr(pubform->pubname)),
1513 : : errdetail("Tables or sequences cannot be added to or dropped from FOR ALL TABLES publications."));
1514 : : else
18 akapila@postgresql.o 1515 [ # # ]:UNC 0 : ereport(ERROR,
1516 : : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1517 : : errmsg("publication \"%s\" is defined as FOR ALL SEQUENCES",
1518 : : NameStr(pubform->pubname)),
1519 : : errdetail("Tables or sequences cannot be added to or dropped from FOR ALL SEQUENCES publications."));
1520 : : }
1461 akapila@postgresql.o 1521 :CBC 459 : }
1522 : :
1523 : : /*
1524 : : * Alter the existing publication.
1525 : : *
1526 : : * This is dispatcher function for AlterPublicationOptions,
1527 : : * AlterPublicationSchemas and AlterPublicationTables.
1528 : : */
1529 : : void
1565 dean.a.rasheed@gmail 1530 : 547 : AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt)
1531 : : {
1532 : : Relation rel;
1533 : : HeapTuple tup;
1534 : : Form_pg_publication pubform;
1535 : :
2471 andres@anarazel.de 1536 : 547 : rel = table_open(PublicationRelationId, RowExclusiveLock);
1537 : :
3203 peter_e@gmx.net 1538 : 547 : tup = SearchSysCacheCopy1(PUBLICATIONNAME,
1539 : : CStringGetDatum(stmt->pubname));
1540 : :
1541 [ - + ]: 547 : if (!HeapTupleIsValid(tup))
3203 peter_e@gmx.net 1542 [ # # ]:UBC 0 : ereport(ERROR,
1543 : : (errcode(ERRCODE_UNDEFINED_OBJECT),
1544 : : errmsg("publication \"%s\" does not exist",
1545 : : stmt->pubname)));
1546 : :
2533 andres@anarazel.de 1547 :CBC 547 : pubform = (Form_pg_publication) GETSTRUCT(tup);
1548 : :
1549 : : /* must be owner */
1079 peter@eisentraut.org 1550 [ - + ]: 547 : if (!object_ownercheck(PublicationRelationId, pubform->oid, GetUserId()))
2886 peter_e@gmx.net 1551 :UBC 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_PUBLICATION,
3203 1552 : 0 : stmt->pubname);
1553 : :
3203 peter_e@gmx.net 1554 [ + + ]:CBC 547 : if (stmt->options)
1565 dean.a.rasheed@gmail 1555 : 58 : AlterPublicationOptions(pstate, stmt, rel, tup);
1556 : : else
1557 : : {
1299 tomas.vondra@postgre 1558 : 489 : List *relations = NIL;
1559 : 489 : List *schemaidlist = NIL;
1343 akapila@postgresql.o 1560 : 489 : Oid pubid = pubform->oid;
1561 : :
1299 tomas.vondra@postgre 1562 : 489 : ObjectsInPublicationToOids(stmt->pubobjects, pstate, &relations,
1563 : : &schemaidlist);
1564 : :
1565 : 480 : CheckAlterPublication(stmt, tup, relations, schemaidlist);
1566 : :
1343 akapila@postgresql.o 1567 : 459 : heap_freetuple(tup);
1568 : :
1569 : : /* Lock the publication so nobody else can do anything with it. */
1570 : 459 : LockDatabaseObject(PublicationRelationId, pubid, 0,
1571 : : AccessExclusiveLock);
1572 : :
1573 : : /*
1574 : : * It is possible that by the time we acquire the lock on publication,
1575 : : * concurrent DDL has removed it. We can test this by checking the
1576 : : * existence of publication. We get the tuple again to avoid the risk
1577 : : * of any publication option getting changed.
1578 : : */
1579 : 459 : tup = SearchSysCacheCopy1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
1580 [ - + ]: 459 : if (!HeapTupleIsValid(tup))
1461 akapila@postgresql.o 1581 [ # # ]:UBC 0 : ereport(ERROR,
1582 : : errcode(ERRCODE_UNDEFINED_OBJECT),
1583 : : errmsg("publication \"%s\" does not exist",
1584 : : stmt->pubname));
1585 : :
1130 akapila@postgresql.o 1586 :CBC 459 : AlterPublicationTables(stmt, tup, relations, pstate->p_sourcetext,
1587 : : schemaidlist != NIL);
1299 tomas.vondra@postgre 1588 : 393 : AlterPublicationSchemas(stmt, tup, schemaidlist);
1589 : : }
1590 : :
1591 : : /* Cleanup. */
3203 peter_e@gmx.net 1592 : 436 : heap_freetuple(tup);
2471 andres@anarazel.de 1593 : 436 : table_close(rel, RowExclusiveLock);
3203 peter_e@gmx.net 1594 : 436 : }
1595 : :
1596 : : /*
1597 : : * Remove relation from publication by mapping OID.
1598 : : */
1599 : : void
1600 : 423 : RemovePublicationRelById(Oid proid)
1601 : : {
1602 : : Relation rel;
1603 : : HeapTuple tup;
1604 : : Form_pg_publication_rel pubrel;
1496 akapila@postgresql.o 1605 : 423 : List *relids = NIL;
1606 : :
2471 andres@anarazel.de 1607 : 423 : rel = table_open(PublicationRelRelationId, RowExclusiveLock);
1608 : :
3203 peter_e@gmx.net 1609 : 423 : tup = SearchSysCache1(PUBLICATIONREL, ObjectIdGetDatum(proid));
1610 : :
1611 [ - + ]: 423 : if (!HeapTupleIsValid(tup))
3203 peter_e@gmx.net 1612 [ # # ]:UBC 0 : elog(ERROR, "cache lookup failed for publication table %u",
1613 : : proid);
1614 : :
3203 peter_e@gmx.net 1615 :CBC 423 : pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
1616 : :
1617 : : /*
1618 : : * Invalidate relcache so that publication info is rebuilt.
1619 : : *
1620 : : * For the partitioned tables, we must invalidate all partitions contained
1621 : : * in the respective partition hierarchies, not just the one explicitly
1622 : : * mentioned in the publication. This is required because we implicitly
1623 : : * publish the child tables when the parent table is published.
1624 : : */
1496 akapila@postgresql.o 1625 : 423 : relids = GetPubPartitionOptionRelations(relids, PUBLICATION_PART_ALL,
1626 : : pubrel->prrelid);
1627 : :
1628 : 423 : InvalidatePublicationRels(relids);
1629 : :
3190 tgl@sss.pgh.pa.us 1630 : 423 : CatalogTupleDelete(rel, &tup->t_self);
1631 : :
3203 peter_e@gmx.net 1632 : 423 : ReleaseSysCache(tup);
1633 : :
2471 andres@anarazel.de 1634 : 423 : table_close(rel, RowExclusiveLock);
3203 peter_e@gmx.net 1635 : 423 : }
1636 : :
1637 : : /*
1638 : : * Remove the publication by mapping OID.
1639 : : */
1640 : : void
1510 akapila@postgresql.o 1641 : 256 : RemovePublicationById(Oid pubid)
1642 : : {
1643 : : Relation rel;
1644 : : HeapTuple tup;
1645 : : Form_pg_publication pubform;
1646 : :
1647 : 256 : rel = table_open(PublicationRelationId, RowExclusiveLock);
1648 : :
1649 : 256 : tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
1650 [ - + ]: 256 : if (!HeapTupleIsValid(tup))
1510 akapila@postgresql.o 1651 [ # # ]:UBC 0 : elog(ERROR, "cache lookup failed for publication %u", pubid);
1652 : :
1397 alvherre@alvh.no-ip. 1653 :CBC 256 : pubform = (Form_pg_publication) GETSTRUCT(tup);
1654 : :
1655 : : /* Invalidate relcache so that publication info is rebuilt. */
1299 tomas.vondra@postgre 1656 [ + + ]: 256 : if (pubform->puballtables)
1510 akapila@postgresql.o 1657 : 39 : CacheInvalidateRelcacheAll();
1658 : :
1659 : 256 : CatalogTupleDelete(rel, &tup->t_self);
1660 : :
1661 : 256 : ReleaseSysCache(tup);
1662 : :
1663 : 256 : table_close(rel, RowExclusiveLock);
1664 : 256 : }
1665 : :
1666 : : /*
1667 : : * Remove schema from publication by mapping OID.
1668 : : */
1669 : : void
1461 1670 : 96 : RemovePublicationSchemaById(Oid psoid)
1671 : : {
1672 : : Relation rel;
1673 : : HeapTuple tup;
1674 : 96 : List *schemaRels = NIL;
1675 : : Form_pg_publication_namespace pubsch;
1676 : :
1677 : 96 : rel = table_open(PublicationNamespaceRelationId, RowExclusiveLock);
1678 : :
1679 : 96 : tup = SearchSysCache1(PUBLICATIONNAMESPACE, ObjectIdGetDatum(psoid));
1680 : :
1681 [ - + ]: 96 : if (!HeapTupleIsValid(tup))
1461 akapila@postgresql.o 1682 [ # # ]:UBC 0 : elog(ERROR, "cache lookup failed for publication schema %u", psoid);
1683 : :
1461 akapila@postgresql.o 1684 :CBC 96 : pubsch = (Form_pg_publication_namespace) GETSTRUCT(tup);
1685 : :
1686 : : /*
1687 : : * Invalidate relcache so that publication info is rebuilt. See
1688 : : * RemovePublicationRelById for why we need to consider all the
1689 : : * partitions.
1690 : : */
1691 : 96 : schemaRels = GetSchemaPublicationRelations(pubsch->pnnspid,
1692 : : PUBLICATION_PART_ALL);
1693 : 96 : InvalidatePublicationRels(schemaRels);
1694 : :
1695 : 96 : CatalogTupleDelete(rel, &tup->t_self);
1696 : :
1697 : 96 : ReleaseSysCache(tup);
1698 : :
1699 : 96 : table_close(rel, RowExclusiveLock);
1700 : 96 : }
1701 : :
1702 : : /*
1703 : : * Open relations specified by a PublicationTable list.
1704 : : * The returned tables are locked in ShareUpdateExclusiveLock mode in order to
1705 : : * add them to a publication.
1706 : : */
1707 : : static List *
1299 tomas.vondra@postgre 1708 : 656 : OpenTableList(List *tables)
1709 : : {
3203 peter_e@gmx.net 1710 : 656 : List *relids = NIL;
1299 tomas.vondra@postgre 1711 : 656 : List *rels = NIL;
1712 : : ListCell *lc;
1343 akapila@postgresql.o 1713 : 656 : List *relids_with_rf = NIL;
1311 tomas.vondra@postgre 1714 : 656 : List *relids_with_collist = NIL;
1715 : :
1716 : : /*
1717 : : * Open, share-lock, and check all the explicitly-specified relations
1718 : : */
1299 1719 [ + + + + : 1345 : foreach(lc, tables)
+ + ]
1720 : : {
1512 alvherre@alvh.no-ip. 1721 : 704 : PublicationTable *t = lfirst_node(PublicationTable, lc);
1722 : 704 : bool recurse = t->relation->inh;
1723 : : Relation rel;
1724 : : Oid myrelid;
1725 : : PublicationRelInfo *pub_rel;
1726 : :
1727 : : /* Allow query cancel in case this takes a long time */
3203 peter_e@gmx.net 1728 [ - + ]: 704 : CHECK_FOR_INTERRUPTS();
1729 : :
1512 alvherre@alvh.no-ip. 1730 : 704 : rel = table_openrv(t->relation, ShareUpdateExclusiveLock);
3203 peter_e@gmx.net 1731 : 701 : myrelid = RelationGetRelid(rel);
1732 : :
1733 : : /*
1734 : : * Filter out duplicates if user specifies "foo, foo".
1735 : : *
1736 : : * Note that this algorithm is known to not be very efficient (O(N^2))
1737 : : * but given that it only works on list of tables given to us by user
1738 : : * it's deemed acceptable.
1739 : : */
1740 [ + + ]: 701 : if (list_member_oid(relids, myrelid))
1741 : : {
1742 : : /* Disallow duplicate tables if there are any with row filters. */
1343 akapila@postgresql.o 1743 [ + + + + ]: 12 : if (t->whereClause || list_member_oid(relids_with_rf, myrelid))
1744 [ + - ]: 6 : ereport(ERROR,
1745 : : (errcode(ERRCODE_DUPLICATE_OBJECT),
1746 : : errmsg("conflicting or redundant WHERE clauses for table \"%s\"",
1747 : : RelationGetRelationName(rel))));
1748 : :
1749 : : /* Disallow duplicate tables if there are any with column lists. */
1311 tomas.vondra@postgre 1750 [ + + + - ]: 6 : if (t->columns || list_member_oid(relids_with_collist, myrelid))
1751 [ + - ]: 6 : ereport(ERROR,
1752 : : (errcode(ERRCODE_DUPLICATE_OBJECT),
1753 : : errmsg("conflicting or redundant column lists for table \"%s\"",
1754 : : RelationGetRelationName(rel))));
1755 : :
2471 andres@anarazel.de 1756 :UBC 0 : table_close(rel, ShareUpdateExclusiveLock);
3203 peter_e@gmx.net 1757 : 0 : continue;
1758 : : }
1759 : :
1512 alvherre@alvh.no-ip. 1760 :CBC 689 : pub_rel = palloc(sizeof(PublicationRelInfo));
1761 : 689 : pub_rel->relation = rel;
1343 akapila@postgresql.o 1762 : 689 : pub_rel->whereClause = t->whereClause;
1311 tomas.vondra@postgre 1763 : 689 : pub_rel->columns = t->columns;
1299 1764 : 689 : rels = lappend(rels, pub_rel);
3203 peter_e@gmx.net 1765 : 689 : relids = lappend_oid(relids, myrelid);
1766 : :
1343 akapila@postgresql.o 1767 [ + + ]: 689 : if (t->whereClause)
1768 : 202 : relids_with_rf = lappend_oid(relids_with_rf, myrelid);
1769 : :
1311 tomas.vondra@postgre 1770 [ + + ]: 689 : if (t->columns)
1771 : 205 : relids_with_collist = lappend_oid(relids_with_collist, myrelid);
1772 : :
1773 : : /*
1774 : : * Add children of this rel, if requested, so that they too are added
1775 : : * to the publication. A partitioned table can't have any inheritance
1776 : : * children other than its partitions, which need not be explicitly
1777 : : * added to the publication.
1778 : : */
2057 peter@eisentraut.org 1779 [ + + + + ]: 689 : if (recurse && rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
1780 : : {
1781 : : List *children;
1782 : : ListCell *child;
1783 : :
3203 peter_e@gmx.net 1784 : 577 : children = find_all_inheritors(myrelid, ShareUpdateExclusiveLock,
1785 : : NULL);
1786 : :
1787 [ + - + + : 1158 : foreach(child, children)
+ + ]
1788 : : {
1789 : 581 : Oid childrelid = lfirst_oid(child);
1790 : :
1791 : : /* Allow query cancel in case this takes a long time */
2516 tgl@sss.pgh.pa.us 1792 [ - + ]: 581 : CHECK_FOR_INTERRUPTS();
1793 : :
1794 : : /*
1795 : : * Skip duplicates if user specified both parent and child
1796 : : * tables.
1797 : : */
3203 peter_e@gmx.net 1798 [ + + ]: 581 : if (list_member_oid(relids, childrelid))
1799 : : {
1800 : : /*
1801 : : * We don't allow to specify row filter for both parent
1802 : : * and child table at the same time as it is not very
1803 : : * clear which one should be given preference.
1804 : : */
1343 akapila@postgresql.o 1805 [ - + ]: 577 : if (childrelid != myrelid &&
1343 akapila@postgresql.o 1806 [ # # # # ]:UBC 0 : (t->whereClause || list_member_oid(relids_with_rf, childrelid)))
1807 [ # # ]: 0 : ereport(ERROR,
1808 : : (errcode(ERRCODE_DUPLICATE_OBJECT),
1809 : : errmsg("conflicting or redundant WHERE clauses for table \"%s\"",
1810 : : RelationGetRelationName(rel))));
1811 : :
1812 : : /*
1813 : : * We don't allow to specify column list for both parent
1814 : : * and child table at the same time as it is not very
1815 : : * clear which one should be given preference.
1816 : : */
1311 tomas.vondra@postgre 1817 [ - + ]:CBC 577 : if (childrelid != myrelid &&
1311 tomas.vondra@postgre 1818 [ # # # # ]:UBC 0 : (t->columns || list_member_oid(relids_with_collist, childrelid)))
1819 [ # # ]: 0 : ereport(ERROR,
1820 : : (errcode(ERRCODE_DUPLICATE_OBJECT),
1821 : : errmsg("conflicting or redundant column lists for table \"%s\"",
1822 : : RelationGetRelationName(rel))));
1823 : :
3203 peter_e@gmx.net 1824 :CBC 577 : continue;
1825 : : }
1826 : :
1827 : : /* find_all_inheritors already got lock */
2471 andres@anarazel.de 1828 : 4 : rel = table_open(childrelid, NoLock);
1512 alvherre@alvh.no-ip. 1829 : 4 : pub_rel = palloc(sizeof(PublicationRelInfo));
1830 : 4 : pub_rel->relation = rel;
1831 : : /* child inherits WHERE clause from parent */
1343 akapila@postgresql.o 1832 : 4 : pub_rel->whereClause = t->whereClause;
1833 : :
1834 : : /* child inherits column list from parent */
1311 tomas.vondra@postgre 1835 : 4 : pub_rel->columns = t->columns;
1299 1836 : 4 : rels = lappend(rels, pub_rel);
3203 peter_e@gmx.net 1837 : 4 : relids = lappend_oid(relids, childrelid);
1838 : :
1343 akapila@postgresql.o 1839 [ + + ]: 4 : if (t->whereClause)
1840 : 1 : relids_with_rf = lappend_oid(relids_with_rf, childrelid);
1841 : :
1311 tomas.vondra@postgre 1842 [ - + ]: 4 : if (t->columns)
1311 tomas.vondra@postgre 1843 :UBC 0 : relids_with_collist = lappend_oid(relids_with_collist, childrelid);
1844 : : }
1845 : : }
1846 : : }
1847 : :
3203 peter_e@gmx.net 1848 :CBC 641 : list_free(relids);
1343 akapila@postgresql.o 1849 : 641 : list_free(relids_with_rf);
1850 : :
1299 tomas.vondra@postgre 1851 : 641 : return rels;
1852 : : }
1853 : :
1854 : : /*
1855 : : * Close all relations in the list.
1856 : : */
1857 : : static void
1858 : 749 : CloseTableList(List *rels)
1859 : : {
1860 : : ListCell *lc;
1861 : :
3203 peter_e@gmx.net 1862 [ + + + + : 1523 : foreach(lc, rels)
+ + ]
1863 : : {
1864 : : PublicationRelInfo *pub_rel;
1865 : :
1512 alvherre@alvh.no-ip. 1866 : 774 : pub_rel = (PublicationRelInfo *) lfirst(lc);
1867 : 774 : table_close(pub_rel->relation, NoLock);
1868 : : }
1869 : :
1343 akapila@postgresql.o 1870 : 749 : list_free_deep(rels);
3203 peter_e@gmx.net 1871 : 749 : }
1872 : :
1873 : : /*
1874 : : * Lock the schemas specified in the schema list in AccessShareLock mode in
1875 : : * order to prevent concurrent schema deletion.
1876 : : */
1877 : : static void
1461 akapila@postgresql.o 1878 : 518 : LockSchemaList(List *schemalist)
1879 : : {
1880 : : ListCell *lc;
1881 : :
1882 [ + + + + : 680 : foreach(lc, schemalist)
+ + ]
1883 : : {
1884 : 162 : Oid schemaid = lfirst_oid(lc);
1885 : :
1886 : : /* Allow query cancel in case this takes a long time */
1887 [ - + ]: 162 : CHECK_FOR_INTERRUPTS();
1888 : 162 : LockDatabaseObject(NamespaceRelationId, schemaid, 0, AccessShareLock);
1889 : :
1890 : : /*
1891 : : * It is possible that by the time we acquire the lock on schema,
1892 : : * concurrent DDL has removed it. We can test this by checking the
1893 : : * existence of schema.
1894 : : */
1895 [ - + ]: 162 : if (!SearchSysCacheExists1(NAMESPACEOID, ObjectIdGetDatum(schemaid)))
1461 akapila@postgresql.o 1896 [ # # ]:UBC 0 : ereport(ERROR,
1897 : : errcode(ERRCODE_UNDEFINED_SCHEMA),
1898 : : errmsg("schema with OID %u does not exist", schemaid));
1899 : : }
1461 akapila@postgresql.o 1900 :CBC 518 : }
1901 : :
1902 : : /*
1903 : : * Add listed tables to the publication.
1904 : : */
1905 : : static void
1299 tomas.vondra@postgre 1906 : 540 : PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
1907 : : AlterPublicationStmt *stmt)
1908 : : {
1909 : : ListCell *lc;
1910 : :
3203 peter_e@gmx.net 1911 [ + + + + : 1083 : foreach(lc, rels)
+ + ]
1912 : : {
1512 alvherre@alvh.no-ip. 1913 : 577 : PublicationRelInfo *pub_rel = (PublicationRelInfo *) lfirst(lc);
1914 : 577 : Relation rel = pub_rel->relation;
1915 : : ObjectAddress obj;
1916 : :
1917 : : /* Must be owner of the table or superuser. */
1079 peter@eisentraut.org 1918 [ + + ]: 577 : if (!object_ownercheck(RelationRelationId, RelationGetRelid(rel), GetUserId()))
2886 peter_e@gmx.net 1919 : 3 : aclcheck_error(ACLCHECK_NOT_OWNER, get_relkind_objtype(rel->rd_rel->relkind),
3203 1920 : 3 : RelationGetRelationName(rel));
1921 : :
1512 alvherre@alvh.no-ip. 1922 : 574 : obj = publication_add_relation(pubid, pub_rel, if_not_exists);
3203 peter_e@gmx.net 1923 [ + + ]: 543 : if (stmt)
1924 : : {
1925 : 314 : EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
1926 : : (Node *) stmt);
1927 : :
1928 [ - + ]: 314 : InvokeObjectPostCreateHook(PublicationRelRelationId,
1929 : : obj.objectId, 0);
1930 : : }
1931 : : }
1932 : 506 : }
1933 : :
1934 : : /*
1935 : : * Remove listed tables from the publication.
1936 : : */
1937 : : static void
1299 tomas.vondra@postgre 1938 : 252 : PublicationDropTables(Oid pubid, List *rels, bool missing_ok)
1939 : : {
1940 : : ObjectAddress obj;
1941 : : ListCell *lc;
1942 : : Oid prid;
1943 : :
3203 peter_e@gmx.net 1944 [ + + + + : 483 : foreach(lc, rels)
+ + ]
1945 : : {
1512 alvherre@alvh.no-ip. 1946 : 240 : PublicationRelInfo *pubrel = (PublicationRelInfo *) lfirst(lc);
1947 : 240 : Relation rel = pubrel->relation;
3203 peter_e@gmx.net 1948 : 240 : Oid relid = RelationGetRelid(rel);
1949 : :
1311 tomas.vondra@postgre 1950 [ - + ]: 240 : if (pubrel->columns)
1311 tomas.vondra@postgre 1951 [ # # ]:UBC 0 : ereport(ERROR,
1952 : : errcode(ERRCODE_SYNTAX_ERROR),
1953 : : errmsg("column list must not be specified in ALTER PUBLICATION ... DROP"));
1954 : :
2533 andres@anarazel.de 1955 :CBC 240 : prid = GetSysCacheOid2(PUBLICATIONRELMAP, Anum_pg_publication_rel_oid,
1956 : : ObjectIdGetDatum(relid),
1957 : : ObjectIdGetDatum(pubid));
3203 peter_e@gmx.net 1958 [ + + ]: 240 : if (!OidIsValid(prid))
1959 : : {
1960 [ - + ]: 6 : if (missing_ok)
3203 peter_e@gmx.net 1961 :UBC 0 : continue;
1962 : :
3203 peter_e@gmx.net 1963 [ + - ]:CBC 6 : ereport(ERROR,
1964 : : (errcode(ERRCODE_UNDEFINED_OBJECT),
1965 : : errmsg("relation \"%s\" is not part of the publication",
1966 : : RelationGetRelationName(rel))));
1967 : : }
1968 : :
1343 akapila@postgresql.o 1969 [ + + ]: 234 : if (pubrel->whereClause)
1970 [ + - ]: 3 : ereport(ERROR,
1971 : : (errcode(ERRCODE_SYNTAX_ERROR),
1972 : : errmsg("cannot use a WHERE clause when removing a table from a publication")));
1973 : :
3203 peter_e@gmx.net 1974 : 231 : ObjectAddressSet(obj, PublicationRelRelationId, prid);
1975 : 231 : performDeletion(&obj, DROP_CASCADE, 0);
1976 : : }
1977 : 243 : }
1978 : :
1979 : : /*
1980 : : * Add listed schemas to the publication.
1981 : : */
1982 : : static void
1299 tomas.vondra@postgre 1983 : 294 : PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists,
1984 : : AlterPublicationStmt *stmt)
1985 : : {
1986 : : ListCell *lc;
1987 : :
1461 akapila@postgresql.o 1988 [ + + + + : 419 : foreach(lc, schemas)
+ + ]
1989 : : {
1990 : 131 : Oid schemaid = lfirst_oid(lc);
1991 : : ObjectAddress obj;
1992 : :
1299 tomas.vondra@postgre 1993 : 131 : obj = publication_add_schema(pubid, schemaid, if_not_exists);
1461 akapila@postgresql.o 1994 [ + + ]: 125 : if (stmt)
1995 : : {
1996 : 34 : EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
1997 : : (Node *) stmt);
1998 : :
1999 [ - + ]: 34 : InvokeObjectPostCreateHook(PublicationNamespaceRelationId,
2000 : : obj.objectId, 0);
2001 : : }
2002 : : }
2003 : 288 : }
2004 : :
2005 : : /*
2006 : : * Remove listed schemas from the publication.
2007 : : */
2008 : : static void
1299 tomas.vondra@postgre 2009 : 221 : PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok)
2010 : : {
2011 : : ObjectAddress obj;
2012 : : ListCell *lc;
2013 : : Oid psid;
2014 : :
1461 akapila@postgresql.o 2015 [ + + + + : 246 : foreach(lc, schemas)
+ + ]
2016 : : {
2017 : 28 : Oid schemaid = lfirst_oid(lc);
2018 : :
1299 tomas.vondra@postgre 2019 : 28 : psid = GetSysCacheOid2(PUBLICATIONNAMESPACEMAP,
2020 : : Anum_pg_publication_namespace_oid,
2021 : : ObjectIdGetDatum(schemaid),
2022 : : ObjectIdGetDatum(pubid));
1461 akapila@postgresql.o 2023 [ + + ]: 28 : if (!OidIsValid(psid))
2024 : : {
2025 [ - + ]: 3 : if (missing_ok)
1461 akapila@postgresql.o 2026 :UBC 0 : continue;
2027 : :
1461 akapila@postgresql.o 2028 [ + - ]:CBC 3 : ereport(ERROR,
2029 : : (errcode(ERRCODE_UNDEFINED_OBJECT),
2030 : : errmsg("tables from schema \"%s\" are not part of the publication",
2031 : : get_namespace_name(schemaid))));
2032 : : }
2033 : :
2034 : 25 : ObjectAddressSet(obj, PublicationNamespaceRelationId, psid);
2035 : 25 : performDeletion(&obj, DROP_CASCADE, 0);
2036 : : }
2037 : 218 : }
2038 : :
2039 : : /*
2040 : : * Internal workhorse for changing a publication owner
2041 : : */
2042 : : static void
3203 peter_e@gmx.net 2043 : 18 : AlterPublicationOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
2044 : : {
2045 : : Form_pg_publication form;
2046 : :
2047 : 18 : form = (Form_pg_publication) GETSTRUCT(tup);
2048 : :
2049 [ + + ]: 18 : if (form->pubowner == newOwnerId)
2050 : 6 : return;
2051 : :
3178 2052 [ + + ]: 12 : if (!superuser())
2053 : : {
2054 : : AclResult aclresult;
2055 : :
2056 : : /* Must be owner */
1079 peter@eisentraut.org 2057 [ - + ]: 6 : if (!object_ownercheck(PublicationRelationId, form->oid, GetUserId()))
2886 peter_e@gmx.net 2058 :UBC 0 : aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_PUBLICATION,
3178 2059 : 0 : NameStr(form->pubname));
2060 : :
2061 : : /* Must be able to become new owner */
1074 rhaas@postgresql.org 2062 :CBC 6 : check_can_set_role(GetUserId(), newOwnerId);
2063 : :
2064 : : /* New owner must have CREATE privilege on database */
1079 peter@eisentraut.org 2065 : 6 : aclresult = object_aclcheck(DatabaseRelationId, MyDatabaseId, newOwnerId, ACL_CREATE);
3178 peter_e@gmx.net 2066 [ - + ]: 6 : if (aclresult != ACLCHECK_OK)
2886 peter_e@gmx.net 2067 :UBC 0 : aclcheck_error(aclresult, OBJECT_DATABASE,
3178 2068 : 0 : get_database_name(MyDatabaseId));
2069 : :
18 akapila@postgresql.o 2070 [ + + ]:GNC 6 : if (!superuser_arg(newOwnerId))
2071 : : {
2072 [ + - + - : 6 : if (form->puballtables || form->puballsequences ||
+ - ]
2073 : 3 : is_schema_publication(form->oid))
2074 [ + - ]: 3 : ereport(ERROR,
2075 : : errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
2076 : : errmsg("permission denied to change owner of publication \"%s\"",
2077 : : NameStr(form->pubname)),
2078 : : errhint("The owner of a FOR ALL TABLES or ALL SEQUENCES or TABLES IN SCHEMA publication must be a superuser."));
2079 : : }
2080 : : }
2081 : :
3203 peter_e@gmx.net 2082 :CBC 9 : form->pubowner = newOwnerId;
3191 alvherre@alvh.no-ip. 2083 : 9 : CatalogTupleUpdate(rel, &tup->t_self, tup);
2084 : :
2085 : : /* Update owner dependency reference */
3203 peter_e@gmx.net 2086 : 9 : changeDependencyOnOwner(PublicationRelationId,
2087 : : form->oid,
2088 : : newOwnerId);
2089 : :
2090 [ - + ]: 9 : InvokeObjectPostAlterHook(PublicationRelationId,
2091 : : form->oid, 0);
2092 : : }
2093 : :
2094 : : /*
2095 : : * Change publication owner -- by name
2096 : : */
2097 : : ObjectAddress
2098 : 18 : AlterPublicationOwner(const char *name, Oid newOwnerId)
2099 : : {
2100 : : Oid pubid;
2101 : : HeapTuple tup;
2102 : : Relation rel;
2103 : : ObjectAddress address;
2104 : : Form_pg_publication pubform;
2105 : :
2471 andres@anarazel.de 2106 : 18 : rel = table_open(PublicationRelationId, RowExclusiveLock);
2107 : :
3203 peter_e@gmx.net 2108 : 18 : tup = SearchSysCacheCopy1(PUBLICATIONNAME, CStringGetDatum(name));
2109 : :
2110 [ - + ]: 18 : if (!HeapTupleIsValid(tup))
3203 peter_e@gmx.net 2111 [ # # ]:UBC 0 : ereport(ERROR,
2112 : : (errcode(ERRCODE_UNDEFINED_OBJECT),
2113 : : errmsg("publication \"%s\" does not exist", name)));
2114 : :
2533 andres@anarazel.de 2115 :CBC 18 : pubform = (Form_pg_publication) GETSTRUCT(tup);
223 akapila@postgresql.o 2116 : 18 : pubid = pubform->oid;
2117 : :
3203 peter_e@gmx.net 2118 : 18 : AlterPublicationOwner_internal(rel, tup, newOwnerId);
2119 : :
223 akapila@postgresql.o 2120 : 15 : ObjectAddressSet(address, PublicationRelationId, pubid);
2121 : :
3203 peter_e@gmx.net 2122 : 15 : heap_freetuple(tup);
2123 : :
2471 andres@anarazel.de 2124 : 15 : table_close(rel, RowExclusiveLock);
2125 : :
3203 peter_e@gmx.net 2126 : 15 : return address;
2127 : : }
2128 : :
2129 : : /*
2130 : : * Change publication owner -- by OID
2131 : : */
2132 : : void
223 akapila@postgresql.o 2133 :UBC 0 : AlterPublicationOwner_oid(Oid pubid, Oid newOwnerId)
2134 : : {
2135 : : HeapTuple tup;
2136 : : Relation rel;
2137 : :
2471 andres@anarazel.de 2138 : 0 : rel = table_open(PublicationRelationId, RowExclusiveLock);
2139 : :
223 akapila@postgresql.o 2140 : 0 : tup = SearchSysCacheCopy1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
2141 : :
3203 peter_e@gmx.net 2142 [ # # ]: 0 : if (!HeapTupleIsValid(tup))
2143 [ # # ]: 0 : ereport(ERROR,
2144 : : (errcode(ERRCODE_UNDEFINED_OBJECT),
2145 : : errmsg("publication with OID %u does not exist", pubid)));
2146 : :
2147 : 0 : AlterPublicationOwner_internal(rel, tup, newOwnerId);
2148 : :
2149 : 0 : heap_freetuple(tup);
2150 : :
2471 andres@anarazel.de 2151 : 0 : table_close(rel, RowExclusiveLock);
3203 peter_e@gmx.net 2152 : 0 : }
2153 : :
2154 : : /*
2155 : : * Extract the publish_generated_columns option value from a DefElem. "stored"
2156 : : * and "none" values are accepted.
2157 : : */
2158 : : static char
277 akapila@postgresql.o 2159 :CBC 38 : defGetGeneratedColsOption(DefElem *def)
2160 : : {
83 2161 : 38 : char *sval = "";
2162 : :
2163 : : /*
2164 : : * A parameter value is required.
2165 : : */
2166 [ + + ]: 38 : if (def->arg)
2167 : : {
2168 : 35 : sval = defGetString(def);
2169 : :
2170 [ + + ]: 35 : if (pg_strcasecmp(sval, "none") == 0)
2171 : 11 : return PUBLISH_GENCOLS_NONE;
2172 [ + + ]: 24 : if (pg_strcasecmp(sval, "stored") == 0)
2173 : 21 : return PUBLISH_GENCOLS_STORED;
2174 : : }
2175 : :
277 2176 [ + - ]: 6 : ereport(ERROR,
2177 : : errcode(ERRCODE_SYNTAX_ERROR),
2178 : : errmsg("invalid value for publication parameter \"%s\": \"%s\"", def->defname, sval),
2179 : : errdetail("Valid values are \"%s\" and \"%s\".", "none", "stored"));
2180 : :
2181 : : return PUBLISH_GENCOLS_NONE; /* keep compiler quiet */
2182 : : }
|