Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * pg_publication.c
4 : : * publication C API manipulation
5 : : *
6 : : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
7 : : * Portions Copyright (c) 1994, Regents of the University of California
8 : : *
9 : : * IDENTIFICATION
10 : : * src/backend/catalog/pg_publication.c
11 : : *
12 : : *-------------------------------------------------------------------------
13 : : */
14 : :
15 : : #include "postgres.h"
16 : :
17 : : #include "access/genam.h"
18 : : #include "access/heapam.h"
19 : : #include "access/htup_details.h"
20 : : #include "access/tableam.h"
21 : : #include "catalog/catalog.h"
22 : : #include "catalog/dependency.h"
23 : : #include "catalog/indexing.h"
24 : : #include "catalog/namespace.h"
25 : : #include "catalog/objectaddress.h"
26 : : #include "catalog/partition.h"
27 : : #include "catalog/pg_inherits.h"
28 : : #include "catalog/pg_namespace.h"
29 : : #include "catalog/pg_publication.h"
30 : : #include "catalog/pg_publication_namespace.h"
31 : : #include "catalog/pg_publication_rel.h"
32 : : #include "catalog/pg_type.h"
33 : : #include "commands/publicationcmds.h"
34 : : #include "funcapi.h"
35 : : #include "utils/array.h"
36 : : #include "utils/builtins.h"
37 : : #include "utils/catcache.h"
38 : : #include "utils/fmgroids.h"
39 : : #include "utils/lsyscache.h"
40 : : #include "utils/rel.h"
41 : : #include "utils/syscache.h"
42 : :
43 : : /* Records association between publication and published table */
44 : : typedef struct
45 : : {
46 : : Oid relid; /* OID of published table */
47 : : Oid pubid; /* OID of publication that publishes this
48 : : * table. */
49 : : } published_rel;
50 : :
51 : : /*
52 : : * Check if relation can be in given publication and throws appropriate
53 : : * error if not.
54 : : */
55 : : static void
11 akapila@postgresql.o 56 :GNC 610 : check_publication_add_relation(PublicationRelInfo *pri)
57 : : {
58 : 610 : Relation targetrel = pri->relation;
59 : : const char *errormsg;
60 : :
61 [ + + ]: 610 : if (pri->except)
62 : 42 : errormsg = gettext_noop("cannot use publication EXCEPT clause for relation \"%s\"");
63 : : else
64 : 568 : errormsg = gettext_noop("cannot add relation \"%s\" to publication");
65 : :
66 : : /* If in EXCEPT clause, must be root partitioned table */
67 [ + + + + ]: 610 : if (pri->except && targetrel->rd_rel->relispartition)
68 [ + - ]: 3 : ereport(ERROR,
69 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
70 : : errmsg(errormsg, RelationGetRelationName(targetrel)),
71 : : errdetail("This operation is not supported for individual partitions.")));
72 : :
73 : : /* Must be a regular or partitioned table */
2196 peter@eisentraut.org 74 [ + + ]:CBC 607 : if (RelationGetForm(targetrel)->relkind != RELKIND_RELATION &&
1438 tomas.vondra@postgre 75 [ + + ]: 77 : RelationGetForm(targetrel)->relkind != RELKIND_PARTITIONED_TABLE)
3342 peter_e@gmx.net 76 [ + - ]: 7 : ereport(ERROR,
77 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
78 : : errmsg(errormsg, RelationGetRelationName(targetrel)),
79 : : errdetail_relkind_not_supported(RelationGetForm(targetrel)->relkind)));
80 : :
81 : : /* Can't be system table */
82 [ + + ]: 600 : if (IsCatalogRelation(targetrel))
83 [ + - ]: 3 : ereport(ERROR,
84 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
85 : : errmsg(errormsg, RelationGetRelationName(targetrel)),
86 : : errdetail("This operation is not supported for system tables.")));
87 : :
88 : : /* UNLOGGED and TEMP relations cannot be part of publication. */
1579 dgustafsson@postgres 89 [ + + ]: 597 : if (targetrel->rd_rel->relpersistence == RELPERSISTENCE_TEMP)
3342 peter_e@gmx.net 90 [ + - ]: 3 : ereport(ERROR,
91 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
92 : : errmsg(errormsg, RelationGetRelationName(targetrel)),
93 : : errdetail("This operation is not supported for temporary tables.")));
1579 dgustafsson@postgres 94 [ + + ]: 594 : else if (targetrel->rd_rel->relpersistence == RELPERSISTENCE_UNLOGGED)
95 [ + - ]: 3 : ereport(ERROR,
96 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
97 : : errmsg(errormsg, RelationGetRelationName(targetrel)),
98 : : errdetail("This operation is not supported for unlogged tables.")));
3342 peter_e@gmx.net 99 : 591 : }
100 : :
101 : : /*
102 : : * Check if schema can be in given publication and throw appropriate error if
103 : : * not.
104 : : */
105 : : static void
1600 akapila@postgresql.o 106 : 122 : check_publication_add_schema(Oid schemaid)
107 : : {
108 : : /* Can't be system namespace */
109 [ + + - + ]: 122 : if (IsCatalogNamespace(schemaid) || IsToastNamespace(schemaid))
110 [ + - ]: 3 : ereport(ERROR,
111 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
112 : : errmsg("cannot add schema \"%s\" to publication",
113 : : get_namespace_name(schemaid)),
114 : : errdetail("This operation is not supported for system schemas.")));
115 : :
116 : : /* Can't be temporary namespace */
117 [ - + ]: 119 : if (isAnyTempNamespace(schemaid))
1600 akapila@postgresql.o 118 [ # # ]:UBC 0 : ereport(ERROR,
119 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
120 : : errmsg("cannot add schema \"%s\" to publication",
121 : : get_namespace_name(schemaid)),
122 : : errdetail("Temporary schemas cannot be replicated.")));
1600 akapila@postgresql.o 123 :CBC 119 : }
124 : :
125 : : /*
126 : : * Returns if relation represented by oid and Form_pg_class entry
127 : : * is publishable.
128 : : *
129 : : * Does same checks as check_publication_add_relation() above except for
130 : : * RELKIND_SEQUENCE, but does not need relation to be opened and also does
131 : : * not throw errors. Here, the additional check is to support ALL SEQUENCES
132 : : * publication.
133 : : *
134 : : * XXX This also excludes all tables with relid < FirstNormalObjectId,
135 : : * ie all tables created during initdb. This mainly affects the preinstalled
136 : : * information_schema. IsCatalogRelationOid() only excludes tables with
137 : : * relid < FirstUnpinnedObjectId, making that test rather redundant,
138 : : * but really we should get rid of the FirstNormalObjectId test not
139 : : * IsCatalogRelationOid. We can't do so today because we don't want
140 : : * information_schema tables to be considered publishable; but this test
141 : : * is really inadequate for that, since the information_schema could be
142 : : * dropped and reloaded and then it'll be considered publishable. The best
143 : : * long-term solution may be to add a "relispublishable" bool to pg_class,
144 : : * and depend on that instead of OID checks.
145 : : */
146 : : static bool
3342 peter_e@gmx.net 147 : 304849 : is_publishable_class(Oid relid, Form_pg_class reltuple)
148 : : {
2196 peter@eisentraut.org 149 : 311063 : return (reltuple->relkind == RELKIND_RELATION ||
157 akapila@postgresql.o 150 [ + + ]:GNC 6214 : reltuple->relkind == RELKIND_PARTITIONED_TABLE ||
151 [ + + ]: 5432 : reltuple->relkind == RELKIND_SEQUENCE) &&
2503 tgl@sss.pgh.pa.us 152 [ + + ]:CBC 300450 : !IsCatalogRelationOid(relid) &&
3342 peter_e@gmx.net 153 [ + + + + : 609698 : reltuple->relpersistence == RELPERSISTENCE_PERMANENT &&
+ + ]
154 : : relid >= FirstNormalObjectId;
155 : : }
156 : :
157 : : /*
158 : : * Another variant of is_publishable_class(), taking a Relation.
159 : : */
160 : : bool
1325 akapila@postgresql.o 161 : 274937 : is_publishable_relation(Relation rel)
162 : : {
163 : 274937 : return is_publishable_class(RelationGetRelid(rel), rel->rd_rel);
164 : : }
165 : :
166 : : /*
167 : : * SQL-callable variant of the above
168 : : *
169 : : * This returns null when the relation does not exist. This is intended to be
170 : : * used for example in psql to avoid gratuitous errors when there are
171 : : * concurrent catalog changes.
172 : : */
173 : : Datum
1321 174 : 3220 : pg_relation_is_publishable(PG_FUNCTION_ARGS)
175 : : {
176 : 3220 : Oid relid = PG_GETARG_OID(0);
177 : : HeapTuple tuple;
178 : : bool result;
179 : :
180 : 3220 : tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
181 [ - + ]: 3220 : if (!HeapTupleIsValid(tuple))
1321 akapila@postgresql.o 182 :UBC 0 : PG_RETURN_NULL();
1321 akapila@postgresql.o 183 :CBC 3220 : result = is_publishable_class(relid, (Form_pg_class) GETSTRUCT(tuple));
184 : 3220 : ReleaseSysCache(tuple);
185 : 3220 : PG_RETURN_BOOL(result);
186 : : }
187 : :
188 : : /*
189 : : * Returns true if the ancestor is in the list of published relations.
190 : : * Otherwise, returns false.
191 : : */
192 : : static bool
1082 193 : 101 : is_ancestor_member_tableinfos(Oid ancestor, List *table_infos)
194 : : {
195 : : ListCell *lc;
196 : :
197 [ + - + + : 351 : foreach(lc, table_infos)
+ + ]
198 : : {
199 : 290 : Oid relid = ((published_rel *) lfirst(lc))->relid;
200 : :
201 [ + + ]: 290 : if (relid == ancestor)
202 : 40 : return true;
203 : : }
204 : :
205 : 61 : return false;
206 : : }
207 : :
208 : : /*
209 : : * Filter out the partitions whose parent tables are also present in the list.
210 : : */
211 : : static void
212 : 179 : filter_partitions(List *table_infos)
213 : : {
214 : : ListCell *lc;
215 : :
216 [ + - + + : 523 : foreach(lc, table_infos)
+ + ]
217 : : {
1600 218 : 344 : bool skip = false;
219 : 344 : List *ancestors = NIL;
220 : : ListCell *lc2;
1082 221 : 344 : published_rel *table_info = (published_rel *) lfirst(lc);
222 : :
223 [ + + ]: 344 : if (get_rel_relispartition(table_info->relid))
224 : 101 : ancestors = get_partition_ancestors(table_info->relid);
225 : :
1600 226 [ + + + + : 405 : foreach(lc2, ancestors)
+ + ]
227 : : {
228 : 101 : Oid ancestor = lfirst_oid(lc2);
229 : :
1082 230 [ + + ]: 101 : if (is_ancestor_member_tableinfos(ancestor, table_infos))
231 : : {
1600 232 : 40 : skip = true;
233 : 40 : break;
234 : : }
235 : : }
236 : :
1082 237 [ + + ]: 344 : if (skip)
238 : 40 : table_infos = foreach_delete_current(table_infos, lc);
239 : : }
1600 240 : 179 : }
241 : :
242 : : /*
243 : : * Returns true if any schema is associated with the publication, false if no
244 : : * schema is associated with the publication.
245 : : */
246 : : bool
1558 247 : 142 : is_schema_publication(Oid pubid)
248 : : {
249 : : Relation pubschsrel;
250 : : ScanKeyData scankey;
251 : : SysScanDesc scan;
252 : : HeapTuple tup;
253 : 142 : bool result = false;
254 : :
255 : 142 : pubschsrel = table_open(PublicationNamespaceRelationId, AccessShareLock);
256 : 142 : ScanKeyInit(&scankey,
257 : : Anum_pg_publication_namespace_pnpubid,
258 : : BTEqualStrategyNumber, F_OIDEQ,
259 : : ObjectIdGetDatum(pubid));
260 : :
261 : 142 : scan = systable_beginscan(pubschsrel,
262 : : PublicationNamespacePnnspidPnpubidIndexId,
263 : : true, NULL, 1, &scankey);
264 : 142 : tup = systable_getnext(scan);
265 : 142 : result = HeapTupleIsValid(tup);
266 : :
267 : 142 : systable_endscan(scan);
268 : 142 : table_close(pubschsrel, AccessShareLock);
269 : :
270 : 142 : return result;
271 : : }
272 : :
273 : : /*
274 : : * Returns true if the relation has column list associated with the
275 : : * publication, false otherwise.
276 : : *
277 : : * If a column list is found, the corresponding bitmap is returned through the
278 : : * cols parameter, if provided. The bitmap is constructed within the given
279 : : * memory context (mcxt).
280 : : */
281 : : bool
493 282 : 811 : check_and_fetch_column_list(Publication *pub, Oid relid, MemoryContext mcxt,
283 : : Bitmapset **cols)
284 : : {
285 : : HeapTuple cftuple;
286 : 811 : bool found = false;
287 : :
288 [ + + ]: 811 : if (pub->alltables)
289 : 194 : return false;
290 : :
291 : 617 : cftuple = SearchSysCache2(PUBLICATIONRELMAP,
292 : : ObjectIdGetDatum(relid),
293 : : ObjectIdGetDatum(pub->oid));
294 [ + + ]: 617 : if (HeapTupleIsValid(cftuple))
295 : : {
296 : : Datum cfdatum;
297 : : bool isnull;
298 : :
299 : : /* Lookup the column list attribute. */
300 : 565 : cfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, cftuple,
301 : : Anum_pg_publication_rel_prattrs, &isnull);
302 : :
303 : : /* Was a column list found? */
304 [ + + ]: 565 : if (!isnull)
305 : : {
306 : : /* Build the column list bitmap in the given memory context. */
307 [ + + ]: 157 : if (cols)
308 : 154 : *cols = pub_collist_to_bitmapset(*cols, cfdatum, mcxt);
309 : :
310 : 157 : found = true;
311 : : }
312 : :
313 : 565 : ReleaseSysCache(cftuple);
314 : : }
315 : :
316 : 617 : return found;
317 : : }
318 : :
319 : : /*
320 : : * Gets the relations based on the publication partition option for a specified
321 : : * relation.
322 : : */
323 : : List *
1635 324 : 3005 : GetPubPartitionOptionRelations(List *result, PublicationPartOpt pub_partopt,
325 : : Oid relid)
326 : : {
327 [ + + + + ]: 3005 : if (get_rel_relkind(relid) == RELKIND_PARTITIONED_TABLE &&
328 : : pub_partopt != PUBLICATION_PART_ROOT)
329 : 602 : {
330 : 602 : List *all_parts = find_all_inheritors(relid, NoLock,
331 : : NULL);
332 : :
333 [ + + ]: 602 : if (pub_partopt == PUBLICATION_PART_ALL)
334 : 498 : result = list_concat(result, all_parts);
335 [ + - ]: 104 : else if (pub_partopt == PUBLICATION_PART_LEAF)
336 : : {
337 : : ListCell *lc;
338 : :
339 [ + - + + : 384 : foreach(lc, all_parts)
+ + ]
340 : : {
341 : 280 : Oid partOid = lfirst_oid(lc);
342 : :
343 [ + + ]: 280 : if (get_rel_relkind(partOid) != RELKIND_PARTITIONED_TABLE)
344 : 173 : result = lappend_oid(result, partOid);
345 : : }
346 : : }
347 : : else
1635 akapila@postgresql.o 348 :UBC 0 : Assert(false);
349 : : }
350 : : else
1635 akapila@postgresql.o 351 :CBC 2403 : result = lappend_oid(result, relid);
352 : :
353 : 3005 : return result;
354 : : }
355 : :
356 : : /*
357 : : * Returns the relid of the topmost ancestor that is published via this
358 : : * publication if any and set its ancestor level to ancestor_level,
359 : : * otherwise returns InvalidOid.
360 : : *
361 : : * The ancestor_level value allows us to compare the results for multiple
362 : : * publications, and decide which value is higher up.
363 : : *
364 : : * Note that the list of ancestors should be ordered such that the topmost
365 : : * ancestor is at the end of the list.
366 : : */
367 : : Oid
1460 tomas.vondra@postgre 368 : 255 : GetTopMostAncestorInPublication(Oid puboid, List *ancestors, int *ancestor_level)
369 : : {
370 : : ListCell *lc;
1482 akapila@postgresql.o 371 : 255 : Oid topmost_relid = InvalidOid;
1460 tomas.vondra@postgre 372 : 255 : int level = 0;
373 : :
374 : : /*
375 : : * Find the "topmost" ancestor that is in this publication.
376 : : */
1482 akapila@postgresql.o 377 [ + - + + : 518 : foreach(lc, ancestors)
+ + ]
378 : : {
379 : 263 : Oid ancestor = lfirst_oid(lc);
11 akapila@postgresql.o 380 :GNC 263 : List *apubids = GetRelationIncludedPublications(ancestor);
1482 akapila@postgresql.o 381 :CBC 263 : List *aschemaPubids = NIL;
382 : :
1460 tomas.vondra@postgre 383 : 263 : level++;
384 : :
1482 akapila@postgresql.o 385 [ + + ]: 263 : if (list_member_oid(apubids, puboid))
386 : : {
387 : 156 : topmost_relid = ancestor;
388 : :
1460 tomas.vondra@postgre 389 [ + + ]: 156 : if (ancestor_level)
390 : 43 : *ancestor_level = level;
391 : : }
392 : : else
393 : : {
1438 394 : 107 : aschemaPubids = GetSchemaPublications(get_rel_namespace(ancestor));
1482 akapila@postgresql.o 395 [ + + ]: 107 : if (list_member_oid(aschemaPubids, puboid))
396 : : {
397 : 5 : topmost_relid = ancestor;
398 : :
1460 tomas.vondra@postgre 399 [ + - ]: 5 : if (ancestor_level)
400 : 5 : *ancestor_level = level;
401 : : }
402 : : }
403 : :
1482 akapila@postgresql.o 404 : 263 : list_free(apubids);
405 : 263 : list_free(aschemaPubids);
406 : : }
407 : :
408 : 255 : return topmost_relid;
409 : : }
410 : :
411 : : /*
412 : : * attnumstoint2vector
413 : : * Convert a Bitmapset of AttrNumbers into an int2vector.
414 : : *
415 : : * AttrNumber numbers are 0-based, i.e., not offset by
416 : : * FirstLowInvalidHeapAttributeNumber.
417 : : */
418 : : static int2vector *
577 drowley@postgresql.o 419 : 166 : attnumstoint2vector(Bitmapset *attrs)
420 : : {
421 : : int2vector *result;
422 : 166 : int n = bms_num_members(attrs);
423 : 166 : int i = -1;
424 : 166 : int j = 0;
425 : :
426 : 166 : result = buildint2vector(NULL, n);
427 : :
428 [ + + ]: 452 : while ((i = bms_next_member(attrs, i)) >= 0)
429 : : {
430 [ - + ]: 286 : Assert(i <= PG_INT16_MAX);
431 : :
432 : 286 : result->values[j++] = (int16) i;
433 : : }
434 : :
435 : 166 : return result;
436 : : }
437 : :
438 : : /*
439 : : * Insert new publication / relation mapping.
440 : : */
441 : : ObjectAddress
1482 akapila@postgresql.o 442 : 621 : publication_add_relation(Oid pubid, PublicationRelInfo *pri,
443 : : bool if_not_exists)
444 : : {
445 : : Relation rel;
446 : : HeapTuple tup;
447 : : Datum values[Natts_pg_publication_rel];
448 : : bool nulls[Natts_pg_publication_rel];
449 : 621 : Relation targetrel = pri->relation;
450 : 621 : Oid relid = RelationGetRelid(targetrel);
451 : : Oid pubreloid;
452 : : Bitmapset *attnums;
3342 peter_e@gmx.net 453 : 621 : Publication *pub = GetPublication(pubid);
454 : : ObjectAddress myself,
455 : : referenced;
1635 akapila@postgresql.o 456 : 621 : List *relids = NIL;
457 : : int i;
458 : :
2610 andres@anarazel.de 459 : 621 : rel = table_open(PublicationRelRelationId, RowExclusiveLock);
460 : :
461 : : /*
462 : : * Check for duplicates. Note that this does not really prevent
463 : : * duplicates, it's here just to provide nicer error message in common
464 : : * case. The real protection is the unique key on the catalog.
465 : : */
3342 peter_e@gmx.net 466 [ + + ]: 621 : if (SearchSysCacheExists2(PUBLICATIONRELMAP, ObjectIdGetDatum(relid),
467 : : ObjectIdGetDatum(pubid)))
468 : : {
2610 andres@anarazel.de 469 : 11 : table_close(rel, RowExclusiveLock);
470 : :
3342 peter_e@gmx.net 471 [ + + ]: 11 : if (if_not_exists)
472 : 8 : return InvalidObjectAddress;
473 : :
474 [ + - ]: 3 : ereport(ERROR,
475 : : (errcode(ERRCODE_DUPLICATE_OBJECT),
476 : : errmsg("relation \"%s\" is already member of publication \"%s\"",
477 : : RelationGetRelationName(targetrel), pub->name)));
478 : : }
479 : :
11 akapila@postgresql.o 480 :GNC 610 : check_publication_add_relation(pri);
481 : :
482 : : /* Validate and translate column names into a Bitmapset of attnums. */
577 drowley@postgresql.o 483 :CBC 591 : attnums = pub_collist_validate(pri->relation, pri->columns);
484 : :
485 : : /* Form a tuple. */
3342 peter_e@gmx.net 486 : 579 : memset(values, 0, sizeof(values));
487 : 579 : memset(nulls, false, sizeof(nulls));
488 : :
1536 alvherre@alvh.no-ip. 489 : 579 : pubreloid = GetNewOidWithIndex(rel, PublicationRelObjectIndexId,
490 : : Anum_pg_publication_rel_oid);
491 : 579 : values[Anum_pg_publication_rel_oid - 1] = ObjectIdGetDatum(pubreloid);
3342 peter_e@gmx.net 492 : 579 : values[Anum_pg_publication_rel_prpubid - 1] =
493 : 579 : ObjectIdGetDatum(pubid);
494 : 579 : values[Anum_pg_publication_rel_prrelid - 1] =
495 : 579 : ObjectIdGetDatum(relid);
11 akapila@postgresql.o 496 :GNC 579 : values[Anum_pg_publication_rel_prexcept - 1] =
497 : 579 : BoolGetDatum(pri->except);
498 : :
499 : : /* Add qualifications, if available */
1482 akapila@postgresql.o 500 [ + + ]:CBC 579 : if (pri->whereClause != NULL)
501 : 165 : values[Anum_pg_publication_rel_prqual - 1] = CStringGetTextDatum(nodeToString(pri->whereClause));
502 : : else
503 : 414 : nulls[Anum_pg_publication_rel_prqual - 1] = true;
504 : :
505 : : /* Add column list, if available */
1450 tomas.vondra@postgre 506 [ + + ]: 579 : if (pri->columns)
577 drowley@postgresql.o 507 : 166 : values[Anum_pg_publication_rel_prattrs - 1] = PointerGetDatum(attnumstoint2vector(attnums));
508 : : else
1450 tomas.vondra@postgre 509 : 413 : nulls[Anum_pg_publication_rel_prattrs - 1] = true;
510 : :
3342 peter_e@gmx.net 511 : 579 : tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
512 : :
513 : : /* Insert tuple into catalog. */
2672 andres@anarazel.de 514 : 579 : CatalogTupleInsert(rel, tup);
3342 peter_e@gmx.net 515 : 579 : heap_freetuple(tup);
516 : :
517 : : /* Register dependencies as needed */
1536 alvherre@alvh.no-ip. 518 : 579 : ObjectAddressSet(myself, PublicationRelRelationId, pubreloid);
519 : :
520 : : /* Add dependency on the publication */
3342 peter_e@gmx.net 521 : 579 : ObjectAddressSet(referenced, PublicationRelationId, pubid);
522 : 579 : recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
523 : :
524 : : /* Add dependency on the relation */
525 : 579 : ObjectAddressSet(referenced, RelationRelationId, relid);
526 : 579 : recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
527 : :
528 : : /* Add dependency on the objects mentioned in the qualifications */
1482 akapila@postgresql.o 529 [ + + ]: 579 : if (pri->whereClause)
530 : 165 : recordDependencyOnSingleRelExpr(&myself, pri->whereClause, relid,
531 : : DEPENDENCY_NORMAL, DEPENDENCY_NORMAL,
532 : : false);
533 : :
534 : : /* Add dependency on the columns, if any are listed */
577 drowley@postgresql.o 535 : 579 : i = -1;
536 [ + + ]: 865 : while ((i = bms_next_member(attnums, i)) >= 0)
537 : : {
538 : 286 : ObjectAddressSubSet(referenced, RelationRelationId, relid, i);
1450 tomas.vondra@postgre 539 : 286 : recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
540 : : }
541 : :
542 : : /* Close the table. */
2610 andres@anarazel.de 543 : 579 : table_close(rel, RowExclusiveLock);
544 : :
545 : : /*
546 : : * Relations excluded via the EXCEPT clause do not need explicit
547 : : * invalidation as CreatePublication() function invalidates all relations
548 : : * as part of defining a FOR ALL TABLES publication.
549 : : */
11 akapila@postgresql.o 550 [ + + ]:GNC 579 : if (!pri->except)
551 : : {
552 : : /*
553 : : * Invalidate relcache so that publication info is rebuilt.
554 : : *
555 : : * For the partitioned tables, we must invalidate all partitions
556 : : * contained in the respective partition hierarchies, not just the one
557 : : * explicitly mentioned in the publication. This is required because
558 : : * we implicitly publish the child tables when the parent table is
559 : : * published.
560 : : */
561 : 540 : relids = GetPubPartitionOptionRelations(relids, PUBLICATION_PART_ALL,
562 : : relid);
563 : :
564 : 540 : InvalidatePublicationRels(relids);
565 : : }
566 : :
3342 peter_e@gmx.net 567 :CBC 579 : return myself;
568 : : }
569 : :
570 : : /*
571 : : * pub_collist_validate
572 : : * Process and validate the 'columns' list and ensure the columns are all
573 : : * valid to use for a publication. Checks for and raises an ERROR for
574 : : * any unknown columns, system columns, duplicate columns, or virtual
575 : : * generated columns.
576 : : *
577 : : * Looks up each column's attnum and returns a 0-based Bitmapset of the
578 : : * corresponding attnums.
579 : : */
580 : : Bitmapset *
577 drowley@postgresql.o 581 : 793 : pub_collist_validate(Relation targetrel, List *columns)
582 : : {
1450 tomas.vondra@postgre 583 : 793 : Bitmapset *set = NULL;
584 : : ListCell *lc;
401 peter@eisentraut.org 585 : 793 : TupleDesc tupdesc = RelationGetDescr(targetrel);
586 : :
1450 tomas.vondra@postgre 587 [ + + + + : 1215 : foreach(lc, columns)
+ + ]
588 : : {
589 : 440 : char *colname = strVal(lfirst(lc));
590 : 440 : AttrNumber attnum = get_attnum(RelationGetRelid(targetrel), colname);
591 : :
592 [ + + ]: 440 : if (attnum == InvalidAttrNumber)
593 [ + - ]: 3 : ereport(ERROR,
594 : : errcode(ERRCODE_UNDEFINED_COLUMN),
595 : : errmsg("column \"%s\" of relation \"%s\" does not exist",
596 : : colname, RelationGetRelationName(targetrel)));
597 : :
598 [ + + ]: 437 : if (!AttrNumberIsForUserDefinedAttr(attnum))
599 [ + - ]: 6 : ereport(ERROR,
600 : : errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
601 : : errmsg("cannot use system column \"%s\" in publication column list",
602 : : colname));
603 : :
401 peter@eisentraut.org 604 [ + + ]: 431 : if (TupleDescAttr(tupdesc, attnum - 1)->attgenerated == ATTRIBUTE_GENERATED_VIRTUAL)
605 [ + - ]: 3 : ereport(ERROR,
606 : : errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
607 : : errmsg("cannot use virtual generated column \"%s\" in publication column list",
608 : : colname));
609 : :
1450 tomas.vondra@postgre 610 [ + + ]: 428 : if (bms_is_member(attnum, set))
611 [ + - ]: 6 : ereport(ERROR,
612 : : errcode(ERRCODE_DUPLICATE_OBJECT),
613 : : errmsg("duplicate column \"%s\" in publication column list",
614 : : colname));
615 : :
616 : 422 : set = bms_add_member(set, attnum);
617 : : }
618 : :
577 drowley@postgresql.o 619 : 775 : return set;
620 : : }
621 : :
622 : : /*
623 : : * Transform a column list (represented by an array Datum) to a bitmapset.
624 : : *
625 : : * If columns isn't NULL, add the column numbers to that set.
626 : : *
627 : : * If mcxt isn't NULL, build the bitmapset in that context.
628 : : */
629 : : Bitmapset *
1450 tomas.vondra@postgre 630 : 222 : pub_collist_to_bitmapset(Bitmapset *columns, Datum pubcols, MemoryContext mcxt)
631 : : {
577 drowley@postgresql.o 632 : 222 : Bitmapset *result = columns;
633 : : ArrayType *arr;
634 : : int nelems;
635 : : int16 *elems;
1403 tgl@sss.pgh.pa.us 636 : 222 : MemoryContext oldcxt = NULL;
637 : :
1450 tomas.vondra@postgre 638 : 222 : arr = DatumGetArrayTypeP(pubcols);
639 : 222 : nelems = ARR_DIMS(arr)[0];
640 [ - + ]: 222 : elems = (int16 *) ARR_DATA_PTR(arr);
641 : :
642 : : /* If a memory context was specified, switch to it. */
643 [ + + ]: 222 : if (mcxt)
644 : 39 : oldcxt = MemoryContextSwitchTo(mcxt);
645 : :
646 [ + + ]: 613 : for (int i = 0; i < nelems; i++)
647 : 391 : result = bms_add_member(result, elems[i]);
648 : :
649 [ + + ]: 222 : if (mcxt)
650 : 39 : MemoryContextSwitchTo(oldcxt);
651 : :
652 : 222 : return result;
653 : : }
654 : :
655 : : /*
656 : : * Returns a bitmap representing the columns of the specified table.
657 : : *
658 : : * Generated columns are included if include_gencols_type is
659 : : * PUBLISH_GENCOLS_STORED.
660 : : */
661 : : Bitmapset *
416 akapila@postgresql.o 662 : 9 : pub_form_cols_map(Relation relation, PublishGencolsType include_gencols_type)
663 : : {
493 664 : 9 : Bitmapset *result = NULL;
665 : 9 : TupleDesc desc = RelationGetDescr(relation);
666 : :
667 [ + + ]: 30 : for (int i = 0; i < desc->natts; i++)
668 : : {
669 : 21 : Form_pg_attribute att = TupleDescAttr(desc, i);
670 : :
416 671 [ + + ]: 21 : if (att->attisdropped)
493 672 : 1 : continue;
673 : :
416 674 [ + + ]: 20 : if (att->attgenerated)
675 : : {
676 : : /* We only support replication of STORED generated cols. */
677 [ + + ]: 2 : if (att->attgenerated != ATTRIBUTE_GENERATED_STORED)
678 : 1 : continue;
679 : :
680 : : /* User hasn't requested to replicate STORED generated cols. */
681 [ + - ]: 1 : if (include_gencols_type != PUBLISH_GENCOLS_STORED)
682 : 1 : continue;
683 : : }
684 : :
493 685 : 18 : result = bms_add_member(result, att->attnum);
686 : : }
687 : :
688 : 9 : return result;
689 : : }
690 : :
691 : : /*
692 : : * Insert new publication / schema mapping.
693 : : */
694 : : ObjectAddress
1438 tomas.vondra@postgre 695 : 131 : publication_add_schema(Oid pubid, Oid schemaid, bool if_not_exists)
696 : : {
697 : : Relation rel;
698 : : HeapTuple tup;
699 : : Datum values[Natts_pg_publication_namespace];
700 : : bool nulls[Natts_pg_publication_namespace];
701 : : Oid psschid;
1600 akapila@postgresql.o 702 : 131 : Publication *pub = GetPublication(pubid);
703 : 131 : List *schemaRels = NIL;
704 : : ObjectAddress myself,
705 : : referenced;
706 : :
707 : 131 : rel = table_open(PublicationNamespaceRelationId, RowExclusiveLock);
708 : :
709 : : /*
710 : : * Check for duplicates. Note that this does not really prevent
711 : : * duplicates, it's here just to provide nicer error message in common
712 : : * case. The real protection is the unique key on the catalog.
713 : : */
1438 tomas.vondra@postgre 714 [ + + ]: 131 : if (SearchSysCacheExists2(PUBLICATIONNAMESPACEMAP,
715 : : ObjectIdGetDatum(schemaid),
716 : : ObjectIdGetDatum(pubid)))
717 : : {
1600 akapila@postgresql.o 718 : 9 : table_close(rel, RowExclusiveLock);
719 : :
720 [ + + ]: 9 : if (if_not_exists)
721 : 6 : return InvalidObjectAddress;
722 : :
723 [ + - ]: 3 : ereport(ERROR,
724 : : (errcode(ERRCODE_DUPLICATE_OBJECT),
725 : : errmsg("schema \"%s\" is already member of publication \"%s\"",
726 : : get_namespace_name(schemaid), pub->name)));
727 : : }
728 : :
729 : 122 : check_publication_add_schema(schemaid);
730 : :
731 : : /* Form a tuple */
732 : 119 : memset(values, 0, sizeof(values));
733 : 119 : memset(nulls, false, sizeof(nulls));
734 : :
735 : 119 : psschid = GetNewOidWithIndex(rel, PublicationNamespaceObjectIndexId,
736 : : Anum_pg_publication_namespace_oid);
737 : 119 : values[Anum_pg_publication_namespace_oid - 1] = ObjectIdGetDatum(psschid);
738 : 119 : values[Anum_pg_publication_namespace_pnpubid - 1] =
739 : 119 : ObjectIdGetDatum(pubid);
740 : 119 : values[Anum_pg_publication_namespace_pnnspid - 1] =
741 : 119 : ObjectIdGetDatum(schemaid);
742 : :
743 : 119 : tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
744 : :
745 : : /* Insert tuple into catalog */
746 : 119 : CatalogTupleInsert(rel, tup);
747 : 119 : heap_freetuple(tup);
748 : :
749 : 119 : ObjectAddressSet(myself, PublicationNamespaceRelationId, psschid);
750 : :
751 : : /* Add dependency on the publication */
752 : 119 : ObjectAddressSet(referenced, PublicationRelationId, pubid);
753 : 119 : recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
754 : :
755 : : /* Add dependency on the schema */
756 : 119 : ObjectAddressSet(referenced, NamespaceRelationId, schemaid);
757 : 119 : recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
758 : :
759 : : /* Close the table */
760 : 119 : table_close(rel, RowExclusiveLock);
761 : :
762 : : /*
763 : : * Invalidate relcache so that publication info is rebuilt. See
764 : : * publication_add_relation for why we need to consider all the
765 : : * partitions.
766 : : */
1438 tomas.vondra@postgre 767 : 119 : schemaRels = GetSchemaPublicationRelations(schemaid,
768 : : PUBLICATION_PART_ALL);
1600 akapila@postgresql.o 769 : 119 : InvalidatePublicationRels(schemaRels);
770 : :
771 : 119 : return myself;
772 : : }
773 : :
774 : : /*
775 : : * Internal function to get the list of publication oids for a relation.
776 : : *
777 : : * If except_flag is true, returns the list of publication that specified the
778 : : * relation in EXCEPT clause; otherwise, returns the list of publications
779 : : * in which relation is included.
780 : : */
781 : : static List *
11 akapila@postgresql.o 782 :GNC 12947 : get_relation_publications(Oid relid, bool except_flag)
783 : : {
3224 bruce@momjian.us 784 :CBC 12947 : List *result = NIL;
785 : : CatCList *pubrellist;
786 : :
787 : : /* Find all publications associated with the relation. */
3342 peter_e@gmx.net 788 : 12947 : pubrellist = SearchSysCacheList1(PUBLICATIONRELMAP,
789 : : ObjectIdGetDatum(relid));
11 akapila@postgresql.o 790 [ + + ]:GNC 14202 : for (int i = 0; i < pubrellist->n_members; i++)
791 : : {
3342 peter_e@gmx.net 792 :CBC 1255 : HeapTuple tup = &pubrellist->members[i]->tuple;
11 akapila@postgresql.o 793 :GNC 1255 : Form_pg_publication_rel pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
794 : 1255 : Oid pubid = pubrel->prpubid;
795 : :
796 [ + + ]: 1255 : if (pubrel->prexcept == except_flag)
797 : 886 : result = lappend_oid(result, pubid);
798 : : }
799 : :
3342 peter_e@gmx.net 800 :CBC 12947 : ReleaseSysCacheList(pubrellist);
801 : :
802 : 12947 : return result;
803 : : }
804 : :
805 : : /*
806 : : * Gets list of publication oids for a relation.
807 : : */
808 : : List *
11 akapila@postgresql.o 809 :GNC 6941 : GetRelationIncludedPublications(Oid relid)
810 : : {
811 : 6941 : return get_relation_publications(relid, false);
812 : : }
813 : :
814 : : /*
815 : : * Gets list of publication oids which has relation in EXCEPT clause.
816 : : */
817 : : List *
818 : 6006 : GetRelationExcludedPublications(Oid relid)
819 : : {
820 : 6006 : return get_relation_publications(relid, true);
821 : : }
822 : :
823 : : /*
824 : : * Internal function to get the list of relation oids for a publication.
825 : : *
826 : : * If except_flag is true, returns the list of relations specified in the
827 : : * EXCEPT clause of the publication; otherwise, returns the list of relations
828 : : * included in the publication.
829 : : */
830 : : static List *
831 : 1407 : get_publication_relations(Oid pubid, PublicationPartOpt pub_partopt,
832 : : bool except_flag)
833 : : {
834 : : List *result;
835 : : Relation pubrelsrel;
836 : : ScanKeyData scankey;
837 : : SysScanDesc scan;
838 : : HeapTuple tup;
839 : :
840 : : /* Find all relations associated with the publication. */
2610 andres@anarazel.de 841 :CBC 1407 : pubrelsrel = table_open(PublicationRelRelationId, AccessShareLock);
842 : :
3342 peter_e@gmx.net 843 : 1407 : ScanKeyInit(&scankey,
844 : : Anum_pg_publication_rel_prpubid,
845 : : BTEqualStrategyNumber, F_OIDEQ,
846 : : ObjectIdGetDatum(pubid));
847 : :
1523 alvherre@alvh.no-ip. 848 : 1407 : scan = systable_beginscan(pubrelsrel, PublicationRelPrpubidIndexId,
849 : : true, NULL, 1, &scankey);
850 : :
3342 peter_e@gmx.net 851 : 1407 : result = NIL;
852 [ + + ]: 4411 : while (HeapTupleIsValid(tup = systable_getnext(scan)))
853 : : {
854 : : Form_pg_publication_rel pubrel;
855 : :
856 : 1597 : pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
857 : :
11 akapila@postgresql.o 858 [ - + ]:GNC 1597 : if (except_flag == pubrel->prexcept)
859 : 1597 : result = GetPubPartitionOptionRelations(result, pub_partopt,
860 : : pubrel->prrelid);
861 : : }
862 : :
3342 peter_e@gmx.net 863 :CBC 1407 : systable_endscan(scan);
2610 andres@anarazel.de 864 : 1407 : table_close(pubrelsrel, AccessShareLock);
865 : :
866 : : /* Now sort and de-duplicate the result list */
1558 akapila@postgresql.o 867 : 1407 : list_sort(result, list_oid_cmp);
868 : 1407 : list_deduplicate_oid(result);
869 : :
3342 peter_e@gmx.net 870 : 1407 : return result;
871 : : }
872 : :
873 : : /*
874 : : * Gets list of relation oids that are associated with a publication.
875 : : *
876 : : * This should only be used FOR TABLE publications, the FOR ALL TABLES/SEQUENCES
877 : : * should use GetAllPublicationRelations().
878 : : */
879 : : List *
11 akapila@postgresql.o 880 :GNC 1215 : GetIncludedPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt)
881 : : {
882 [ - + ]: 1215 : Assert(!GetPublication(pubid)->alltables);
883 : :
884 : 1215 : return get_publication_relations(pubid, pub_partopt, false);
885 : : }
886 : :
887 : : /*
888 : : * Gets list of table oids that were specified in the EXCEPT clause for a
889 : : * publication.
890 : : *
891 : : * This should only be used FOR ALL TABLES publications.
892 : : */
893 : : List *
894 : 192 : GetExcludedPublicationTables(Oid pubid, PublicationPartOpt pub_partopt)
895 : : {
896 [ - + ]: 192 : Assert(GetPublication(pubid)->alltables);
897 : :
898 : 192 : return get_publication_relations(pubid, pub_partopt, true);
899 : : }
900 : :
901 : : /*
902 : : * Gets list of publication oids for publications marked as FOR ALL TABLES.
903 : : */
904 : : List *
3342 peter_e@gmx.net 905 :CBC 4646 : GetAllTablesPublications(void)
906 : : {
907 : : List *result;
908 : : Relation rel;
909 : : ScanKeyData scankey;
910 : : SysScanDesc scan;
911 : : HeapTuple tup;
912 : :
913 : : /* Find all publications that are marked as for all tables. */
2610 andres@anarazel.de 914 : 4646 : rel = table_open(PublicationRelationId, AccessShareLock);
915 : :
3342 peter_e@gmx.net 916 : 4646 : ScanKeyInit(&scankey,
917 : : Anum_pg_publication_puballtables,
918 : : BTEqualStrategyNumber, F_BOOLEQ,
919 : : BoolGetDatum(true));
920 : :
921 : 4646 : scan = systable_beginscan(rel, InvalidOid, false,
922 : : NULL, 1, &scankey);
923 : :
924 : 4646 : result = NIL;
925 [ + + ]: 4754 : while (HeapTupleIsValid(tup = systable_getnext(scan)))
926 : : {
2489 tgl@sss.pgh.pa.us 927 : 108 : Oid oid = ((Form_pg_publication) GETSTRUCT(tup))->oid;
928 : :
2672 andres@anarazel.de 929 : 108 : result = lappend_oid(result, oid);
930 : : }
931 : :
3342 peter_e@gmx.net 932 : 4646 : systable_endscan(scan);
2610 andres@anarazel.de 933 : 4646 : table_close(rel, AccessShareLock);
934 : :
3342 peter_e@gmx.net 935 : 4646 : return result;
936 : : }
937 : :
938 : : /*
939 : : * Gets list of all relations published by FOR ALL TABLES/SEQUENCES
940 : : * publication.
941 : : *
942 : : * If the publication publishes partition changes via their respective root
943 : : * partitioned tables, we must exclude partitions in favor of including the
944 : : * root partitioned tables. This is not applicable to FOR ALL SEQUENCES
945 : : * publication.
946 : : *
947 : : * For a FOR ALL TABLES publication, the returned list excludes tables mentioned
948 : : * in EXCEPT TABLE clause.
949 : : */
950 : : List *
11 akapila@postgresql.o 951 :GNC 198 : GetAllPublicationRelations(Oid pubid, char relkind, bool pubviaroot)
952 : : {
953 : : Relation classRel;
954 : : ScanKeyData key[1];
955 : : TableScanDesc scan;
956 : : HeapTuple tuple;
3342 peter_e@gmx.net 957 :CBC 198 : List *result = NIL;
11 akapila@postgresql.o 958 :GNC 198 : List *exceptlist = NIL;
959 : :
157 960 [ + + - + ]: 198 : Assert(!(relkind == RELKIND_SEQUENCE && pubviaroot));
961 : :
962 : : /* EXCEPT filtering applies only to relations, not sequences */
11 963 [ + + ]: 198 : if (relkind == RELKIND_RELATION)
964 : 192 : exceptlist = GetExcludedPublicationTables(pubid, pubviaroot ?
965 : 192 : PUBLICATION_PART_ROOT :
966 : : PUBLICATION_PART_LEAF);
967 : :
2610 andres@anarazel.de 968 :CBC 198 : classRel = table_open(RelationRelationId, AccessShareLock);
969 : :
3342 peter_e@gmx.net 970 : 198 : ScanKeyInit(&key[0],
971 : : Anum_pg_class_relkind,
972 : : BTEqualStrategyNumber, F_CHAREQ,
973 : : CharGetDatum(relkind));
974 : :
2561 andres@anarazel.de 975 : 198 : scan = table_beginscan_catalog(classRel, 1, key);
976 : :
3342 peter_e@gmx.net 977 [ + + ]: 14133 : while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
978 : : {
3224 bruce@momjian.us 979 : 13935 : Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
2672 andres@anarazel.de 980 : 13935 : Oid relid = relForm->oid;
981 : :
2167 peter@eisentraut.org 982 [ + + ]: 13935 : if (is_publishable_class(relid, relForm) &&
11 akapila@postgresql.o 983 [ + + + + ]:GNC 873 : !(relForm->relispartition && pubviaroot) &&
984 [ + + ]: 776 : !list_member_oid(exceptlist, relid))
3342 peter_e@gmx.net 985 :CBC 751 : result = lappend_oid(result, relid);
986 : : }
987 : :
2561 andres@anarazel.de 988 : 198 : table_endscan(scan);
989 : :
2167 peter@eisentraut.org 990 [ + + ]: 198 : if (pubviaroot)
991 : : {
992 : 16 : ScanKeyInit(&key[0],
993 : : Anum_pg_class_relkind,
994 : : BTEqualStrategyNumber, F_CHAREQ,
995 : : CharGetDatum(RELKIND_PARTITIONED_TABLE));
996 : :
997 : 16 : scan = table_beginscan_catalog(classRel, 1, key);
998 : :
999 [ + + ]: 87 : while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
1000 : : {
1001 : 71 : Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
1002 : 71 : Oid relid = relForm->oid;
1003 : :
1004 [ + - ]: 71 : if (is_publishable_class(relid, relForm) &&
11 akapila@postgresql.o 1005 [ + + ]:GNC 71 : !relForm->relispartition &&
1006 [ + + ]: 55 : !list_member_oid(exceptlist, relid))
2167 peter@eisentraut.org 1007 :CBC 52 : result = lappend_oid(result, relid);
1008 : : }
1009 : :
1010 : 16 : table_endscan(scan);
1011 : : }
1012 : :
2164 1013 : 198 : table_close(classRel, AccessShareLock);
3342 peter_e@gmx.net 1014 : 198 : return result;
1015 : : }
1016 : :
1017 : : /*
1018 : : * Gets the list of schema oids for a publication.
1019 : : *
1020 : : * This should only be used FOR TABLES IN SCHEMA publications.
1021 : : */
1022 : : List *
1438 tomas.vondra@postgre 1023 : 1169 : GetPublicationSchemas(Oid pubid)
1024 : : {
1600 akapila@postgresql.o 1025 : 1169 : List *result = NIL;
1026 : : Relation pubschsrel;
1027 : : ScanKeyData scankey;
1028 : : SysScanDesc scan;
1029 : : HeapTuple tup;
1030 : :
1031 : : /* Find all schemas associated with the publication */
1032 : 1169 : pubschsrel = table_open(PublicationNamespaceRelationId, AccessShareLock);
1033 : :
1438 tomas.vondra@postgre 1034 : 1169 : ScanKeyInit(&scankey,
1035 : : Anum_pg_publication_namespace_pnpubid,
1036 : : BTEqualStrategyNumber, F_OIDEQ,
1037 : : ObjectIdGetDatum(pubid));
1038 : :
1600 akapila@postgresql.o 1039 : 1169 : scan = systable_beginscan(pubschsrel,
1040 : : PublicationNamespacePnnspidPnpubidIndexId,
1041 : : true, NULL, 1, &scankey);
1042 [ + + ]: 1219 : while (HeapTupleIsValid(tup = systable_getnext(scan)))
1043 : : {
1044 : : Form_pg_publication_namespace pubsch;
1045 : :
1046 : 50 : pubsch = (Form_pg_publication_namespace) GETSTRUCT(tup);
1047 : :
1048 : 50 : result = lappend_oid(result, pubsch->pnnspid);
1049 : : }
1050 : :
1051 : 1169 : systable_endscan(scan);
1052 : 1169 : table_close(pubschsrel, AccessShareLock);
1053 : :
1054 : 1169 : return result;
1055 : : }
1056 : :
1057 : : /*
1058 : : * Gets the list of publication oids associated with a specified schema.
1059 : : */
1060 : : List *
1438 tomas.vondra@postgre 1061 : 6703 : GetSchemaPublications(Oid schemaid)
1062 : : {
1600 akapila@postgresql.o 1063 : 6703 : List *result = NIL;
1064 : : CatCList *pubschlist;
1065 : : int i;
1066 : :
1067 : : /* Find all publications associated with the schema */
1068 : 6703 : pubschlist = SearchSysCacheList1(PUBLICATIONNAMESPACEMAP,
1069 : : ObjectIdGetDatum(schemaid));
1070 [ + + ]: 6761 : for (i = 0; i < pubschlist->n_members; i++)
1071 : : {
1072 : 58 : HeapTuple tup = &pubschlist->members[i]->tuple;
1073 : 58 : Oid pubid = ((Form_pg_publication_namespace) GETSTRUCT(tup))->pnpubid;
1074 : :
1075 : 58 : result = lappend_oid(result, pubid);
1076 : : }
1077 : :
1078 : 6703 : ReleaseSysCacheList(pubschlist);
1079 : :
1080 : 6703 : return result;
1081 : : }
1082 : :
1083 : : /*
1084 : : * Get the list of publishable relation oids for a specified schema.
1085 : : */
1086 : : List *
1438 tomas.vondra@postgre 1087 : 250 : GetSchemaPublicationRelations(Oid schemaid, PublicationPartOpt pub_partopt)
1088 : : {
1089 : : Relation classRel;
1090 : : ScanKeyData key[1];
1091 : : TableScanDesc scan;
1092 : : HeapTuple tuple;
1600 akapila@postgresql.o 1093 : 250 : List *result = NIL;
1094 : :
1095 [ - + ]: 250 : Assert(OidIsValid(schemaid));
1096 : :
1097 : 250 : classRel = table_open(RelationRelationId, AccessShareLock);
1098 : :
1099 : 250 : ScanKeyInit(&key[0],
1100 : : Anum_pg_class_relnamespace,
1101 : : BTEqualStrategyNumber, F_OIDEQ,
1102 : : ObjectIdGetDatum(schemaid));
1103 : :
1104 : : /* get all the relations present in the specified schema */
1105 : 250 : scan = table_beginscan_catalog(classRel, 1, key);
1106 [ + + ]: 12936 : while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
1107 : : {
1108 : 12686 : Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
1109 : 12686 : Oid relid = relForm->oid;
1110 : : char relkind;
1111 : :
1112 [ + + ]: 12686 : if (!is_publishable_class(relid, relForm))
1113 : 4351 : continue;
1114 : :
1115 : 8335 : relkind = get_rel_relkind(relid);
1438 tomas.vondra@postgre 1116 [ + + ]: 8335 : if (relkind == RELKIND_RELATION)
1117 : 7150 : result = lappend_oid(result, relid);
1118 [ + + ]: 1185 : else if (relkind == RELKIND_PARTITIONED_TABLE)
1119 : : {
1600 akapila@postgresql.o 1120 : 391 : List *partitionrels = NIL;
1121 : :
1122 : : /*
1123 : : * It is quite possible that some of the partitions are in a
1124 : : * different schema than the parent table, so we need to get such
1125 : : * partitions separately.
1126 : : */
1127 : 391 : partitionrels = GetPubPartitionOptionRelations(partitionrels,
1128 : : pub_partopt,
1129 : : relForm->oid);
1130 : 391 : result = list_concat_unique_oid(result, partitionrels);
1131 : : }
1132 : : }
1133 : :
1134 : 250 : table_endscan(scan);
1135 : 250 : table_close(classRel, AccessShareLock);
1136 : 250 : return result;
1137 : : }
1138 : :
1139 : : /*
1140 : : * Gets the list of all relations published by FOR TABLES IN SCHEMA
1141 : : * publication.
1142 : : */
1143 : : List *
1438 tomas.vondra@postgre 1144 : 967 : GetAllSchemaPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt)
1145 : : {
1600 akapila@postgresql.o 1146 : 967 : List *result = NIL;
1438 tomas.vondra@postgre 1147 : 967 : List *pubschemalist = GetPublicationSchemas(pubid);
1148 : : ListCell *cell;
1149 : :
1600 akapila@postgresql.o 1150 [ + + + + : 1002 : foreach(cell, pubschemalist)
+ + ]
1151 : : {
1152 : 35 : Oid schemaid = lfirst_oid(cell);
1153 : 35 : List *schemaRels = NIL;
1154 : :
1438 tomas.vondra@postgre 1155 : 35 : schemaRels = GetSchemaPublicationRelations(schemaid, pub_partopt);
1600 akapila@postgresql.o 1156 : 35 : result = list_concat(result, schemaRels);
1157 : : }
1158 : :
1159 : 967 : return result;
1160 : : }
1161 : :
1162 : : /*
1163 : : * Get publication using oid
1164 : : *
1165 : : * The Publication struct and its data are palloc'ed here.
1166 : : */
1167 : : Publication *
3342 peter_e@gmx.net 1168 : 6363 : GetPublication(Oid pubid)
1169 : : {
1170 : : HeapTuple tup;
1171 : : Publication *pub;
1172 : : Form_pg_publication pubform;
1173 : :
1174 : 6363 : tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
1175 [ - + ]: 6363 : if (!HeapTupleIsValid(tup))
3342 peter_e@gmx.net 1176 [ # # ]:UBC 0 : elog(ERROR, "cache lookup failed for publication %u", pubid);
1177 : :
3342 peter_e@gmx.net 1178 :CBC 6363 : pubform = (Form_pg_publication) GETSTRUCT(tup);
1179 : :
95 michael@paquier.xyz 1180 :GNC 6363 : pub = palloc_object(Publication);
3342 peter_e@gmx.net 1181 :CBC 6363 : pub->oid = pubid;
1182 : 6363 : pub->name = pstrdup(NameStr(pubform->pubname));
1183 : 6363 : pub->alltables = pubform->puballtables;
157 akapila@postgresql.o 1184 :GNC 6363 : pub->allsequences = pubform->puballsequences;
3342 peter_e@gmx.net 1185 :CBC 6363 : pub->pubactions.pubinsert = pubform->pubinsert;
1186 : 6363 : pub->pubactions.pubupdate = pubform->pubupdate;
1187 : 6363 : pub->pubactions.pubdelete = pubform->pubdelete;
2899 1188 : 6363 : pub->pubactions.pubtruncate = pubform->pubtruncate;
2167 peter@eisentraut.org 1189 : 6363 : pub->pubviaroot = pubform->pubviaroot;
411 akapila@postgresql.o 1190 : 6363 : pub->pubgencols_type = pubform->pubgencols;
1191 : :
3342 peter_e@gmx.net 1192 : 6363 : ReleaseSysCache(tup);
1193 : :
1194 : 6363 : return pub;
1195 : : }
1196 : :
1197 : : /*
1198 : : * Get Publication using name.
1199 : : */
1200 : : Publication *
1201 : 1607 : GetPublicationByName(const char *pubname, bool missing_ok)
1202 : : {
1203 : : Oid oid;
1204 : :
2274 alvherre@alvh.no-ip. 1205 : 1607 : oid = get_publication_oid(pubname, missing_ok);
1206 : :
1207 [ + + ]: 1607 : return OidIsValid(oid) ? GetPublication(oid) : NULL;
1208 : : }
1209 : :
1210 : : /*
1211 : : * Get information of the tables in the given publication array.
1212 : : *
1213 : : * Returns pubid, relid, column list, row filter for each table.
1214 : : */
1215 : : Datum
3342 peter_e@gmx.net 1216 : 3176 : pg_get_publication_tables(PG_FUNCTION_ARGS)
1217 : : {
1218 : : #define NUM_PUBLICATION_TABLES_ELEM 4
1219 : : FuncCallContext *funcctx;
1082 akapila@postgresql.o 1220 : 3176 : List *table_infos = NIL;
1221 : :
1222 : : /* stuff done only on the first call of the function */
3342 peter_e@gmx.net 1223 [ + + ]: 3176 : if (SRF_IS_FIRSTCALL())
1224 : : {
1225 : : TupleDesc tupdesc;
1226 : : MemoryContext oldcontext;
1227 : : ArrayType *arr;
1228 : : Datum *elems;
1229 : : int nelems,
1230 : : i;
1082 akapila@postgresql.o 1231 : 1048 : bool viaroot = false;
1232 : :
1233 : : /* create a function context for cross-call persistence */
3342 peter_e@gmx.net 1234 : 1048 : funcctx = SRF_FIRSTCALL_INIT();
1235 : :
1236 : : /* switch to memory context appropriate for multiple function calls */
1237 : 1048 : oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
1238 : :
1239 : : /*
1240 : : * Deconstruct the parameter into elements where each element is a
1241 : : * publication name.
1242 : : */
1082 akapila@postgresql.o 1243 : 1048 : arr = PG_GETARG_ARRAYTYPE_P(0);
520 alvherre@alvh.no-ip. 1244 : 1048 : deconstruct_array_builtin(arr, TEXTOID, &elems, NULL, &nelems);
1245 : :
1246 : : /* Get Oids of tables from each publication. */
1082 akapila@postgresql.o 1247 [ + + ]: 2144 : for (i = 0; i < nelems; i++)
1248 : : {
1249 : : Publication *pub_elem;
1250 : 1096 : List *pub_elem_tables = NIL;
1251 : : ListCell *lc;
1252 : :
1253 : 1096 : pub_elem = GetPublicationByName(TextDatumGetCString(elems[i]), false);
1254 : :
1255 : : /*
1256 : : * Publications support partitioned tables. If
1257 : : * publish_via_partition_root is false, all changes are replicated
1258 : : * using leaf partition identity and schema, so we only need
1259 : : * those. Otherwise, get the partitioned table itself.
1260 : : */
1261 [ + + ]: 1096 : if (pub_elem->alltables)
11 akapila@postgresql.o 1262 :GNC 192 : pub_elem_tables = GetAllPublicationRelations(pub_elem->oid,
1263 : : RELKIND_RELATION,
157 1264 : 192 : pub_elem->pubviaroot);
1265 : : else
1266 : : {
1267 : : List *relids,
1268 : : *schemarelids;
1269 : :
11 1270 : 904 : relids = GetIncludedPublicationRelations(pub_elem->oid,
1271 : 904 : pub_elem->pubviaroot ?
1272 : 904 : PUBLICATION_PART_ROOT :
1273 : : PUBLICATION_PART_LEAF);
1082 akapila@postgresql.o 1274 :CBC 904 : schemarelids = GetAllSchemaPublicationRelations(pub_elem->oid,
1275 : 904 : pub_elem->pubviaroot ?
1276 : 904 : PUBLICATION_PART_ROOT :
1277 : : PUBLICATION_PART_LEAF);
1278 : 904 : pub_elem_tables = list_concat_unique_oid(relids, schemarelids);
1279 : : }
1280 : :
1281 : : /*
1282 : : * Record the published table and the corresponding publication so
1283 : : * that we can get row filters and column lists later.
1284 : : *
1285 : : * When a table is published by multiple publications, to obtain
1286 : : * all row filters and column lists, the structure related to this
1287 : : * table will be recorded multiple times.
1288 : : */
1289 [ + + + + : 3264 : foreach(lc, pub_elem_tables)
+ + ]
1290 : : {
95 michael@paquier.xyz 1291 :GNC 2168 : published_rel *table_info = palloc_object(published_rel);
1292 : :
1082 akapila@postgresql.o 1293 :CBC 2168 : table_info->relid = lfirst_oid(lc);
1294 : 2168 : table_info->pubid = pub_elem->oid;
1295 : 2168 : table_infos = lappend(table_infos, table_info);
1296 : : }
1297 : :
1298 : : /* At least one publication is using publish_via_partition_root. */
1299 [ + + ]: 1096 : if (pub_elem->pubviaroot)
1300 : 187 : viaroot = true;
1301 : : }
1302 : :
1303 : : /*
1304 : : * If the publication publishes partition changes via their respective
1305 : : * root partitioned tables, we must exclude partitions in favor of
1306 : : * including the root partitioned tables. Otherwise, the function
1307 : : * could return both the child and parent tables which could cause
1308 : : * data of the child table to be double-published on the subscriber
1309 : : * side.
1310 : : */
1311 [ + + ]: 1048 : if (viaroot)
1312 : 179 : filter_partitions(table_infos);
1313 : :
1314 : : /* Construct a tuple descriptor for the result rows. */
1361 michael@paquier.xyz 1315 : 1048 : tupdesc = CreateTemplateTupleDesc(NUM_PUBLICATION_TABLES_ELEM);
1082 akapila@postgresql.o 1316 : 1048 : TupleDescInitEntry(tupdesc, (AttrNumber) 1, "pubid",
1317 : : OIDOID, -1, 0);
1318 : 1048 : TupleDescInitEntry(tupdesc, (AttrNumber) 2, "relid",
1319 : : OIDOID, -1, 0);
1320 : 1048 : TupleDescInitEntry(tupdesc, (AttrNumber) 3, "attrs",
1321 : : INT2VECTOROID, -1, 0);
1322 : 1048 : TupleDescInitEntry(tupdesc, (AttrNumber) 4, "qual",
1323 : : PG_NODE_TREEOID, -1, 0);
1324 : :
1396 1325 : 1048 : funcctx->tuple_desc = BlessTupleDesc(tupdesc);
472 peter@eisentraut.org 1326 : 1048 : funcctx->user_fctx = table_infos;
1327 : :
3342 peter_e@gmx.net 1328 : 1048 : MemoryContextSwitchTo(oldcontext);
1329 : : }
1330 : :
1331 : : /* stuff done on every call of the function */
1332 : 3176 : funcctx = SRF_PERCALL_SETUP();
1082 akapila@postgresql.o 1333 : 3176 : table_infos = (List *) funcctx->user_fctx;
1334 : :
1335 [ + + ]: 3176 : if (funcctx->call_cntr < list_length(table_infos))
1336 : : {
1396 1337 : 2128 : HeapTuple pubtuple = NULL;
1338 : : HeapTuple rettuple;
1339 : : Publication *pub;
1082 1340 : 2128 : published_rel *table_info = (published_rel *) list_nth(table_infos, funcctx->call_cntr);
1341 : 2128 : Oid relid = table_info->relid;
1269 1342 : 2128 : Oid schemaid = get_rel_namespace(relid);
1338 peter@eisentraut.org 1343 : 2128 : Datum values[NUM_PUBLICATION_TABLES_ELEM] = {0};
1344 : 2128 : bool nulls[NUM_PUBLICATION_TABLES_ELEM] = {0};
1345 : :
1346 : : /*
1347 : : * Form tuple with appropriate data.
1348 : : */
1349 : :
1082 akapila@postgresql.o 1350 : 2128 : pub = GetPublication(table_info->pubid);
1351 : :
1352 : 2128 : values[0] = ObjectIdGetDatum(pub->oid);
1353 : 2128 : values[1] = ObjectIdGetDatum(relid);
1354 : :
1355 : : /*
1356 : : * We don't consider row filters or column lists for FOR ALL TABLES or
1357 : : * FOR TABLES IN SCHEMA publications.
1358 : : */
1359 [ + + ]: 2128 : if (!pub->alltables &&
1269 1360 [ + + ]: 1342 : !SearchSysCacheExists2(PUBLICATIONNAMESPACEMAP,
1361 : : ObjectIdGetDatum(schemaid),
1362 : : ObjectIdGetDatum(pub->oid)))
1363 : 1295 : pubtuple = SearchSysCacheCopy2(PUBLICATIONRELMAP,
1364 : : ObjectIdGetDatum(relid),
1365 : : ObjectIdGetDatum(pub->oid));
1366 : :
1396 1367 [ + + ]: 2128 : if (HeapTupleIsValid(pubtuple))
1368 : : {
1369 : : /* Lookup the column list attribute. */
1082 1370 : 1182 : values[2] = SysCacheGetAttr(PUBLICATIONRELMAP, pubtuple,
1371 : : Anum_pg_publication_rel_prattrs,
1372 : : &(nulls[2]));
1373 : :
1374 : : /* Null indicates no filter. */
1375 : 1182 : values[3] = SysCacheGetAttr(PUBLICATIONRELMAP, pubtuple,
1376 : : Anum_pg_publication_rel_prqual,
1377 : : &(nulls[3]));
1378 : : }
1379 : : else
1380 : : {
1396 1381 : 946 : nulls[2] = true;
1082 1382 : 946 : nulls[3] = true;
1383 : : }
1384 : :
1385 : : /* Show all columns when the column list is not specified. */
1386 [ + + ]: 2128 : if (nulls[2])
1387 : : {
1157 1388 : 2002 : Relation rel = table_open(relid, AccessShareLock);
1389 : 2002 : int nattnums = 0;
1390 : : int16 *attnums;
1391 : 2002 : TupleDesc desc = RelationGetDescr(rel);
1392 : : int i;
1393 : :
95 michael@paquier.xyz 1394 :GNC 2002 : attnums = palloc_array(int16, desc->natts);
1395 : :
1157 akapila@postgresql.o 1396 [ + + ]:CBC 5468 : for (i = 0; i < desc->natts; i++)
1397 : : {
1398 : 3466 : Form_pg_attribute att = TupleDescAttr(desc, i);
1399 : :
416 1400 [ + + ]: 3466 : if (att->attisdropped)
1157 1401 : 6 : continue;
1402 : :
416 1403 [ + + ]: 3460 : if (att->attgenerated)
1404 : : {
1405 : : /* We only support replication of STORED generated cols. */
1406 [ + + ]: 48 : if (att->attgenerated != ATTRIBUTE_GENERATED_STORED)
1407 : 36 : continue;
1408 : :
1409 : : /*
1410 : : * User hasn't requested to replicate STORED generated
1411 : : * cols.
1412 : : */
1413 [ + + ]: 12 : if (pub->pubgencols_type != PUBLISH_GENCOLS_STORED)
1414 : 9 : continue;
1415 : : }
1416 : :
1157 1417 : 3415 : attnums[nattnums++] = att->attnum;
1418 : : }
1419 : :
1420 [ + + ]: 2002 : if (nattnums > 0)
1421 : : {
1082 1422 : 1980 : values[2] = PointerGetDatum(buildint2vector(attnums, nattnums));
1423 : 1980 : nulls[2] = false;
1424 : : }
1425 : :
1157 1426 : 2002 : table_close(rel, AccessShareLock);
1427 : : }
1428 : :
1396 1429 : 2128 : rettuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
1430 : :
1431 : 2128 : SRF_RETURN_NEXT(funcctx, HeapTupleGetDatum(rettuple));
1432 : : }
1433 : :
3342 peter_e@gmx.net 1434 : 1048 : SRF_RETURN_DONE(funcctx);
1435 : : }
1436 : :
1437 : : /*
1438 : : * Returns Oids of sequences in a publication.
1439 : : */
1440 : : Datum
157 akapila@postgresql.o 1441 :GNC 232 : pg_get_publication_sequences(PG_FUNCTION_ARGS)
1442 : : {
1443 : : FuncCallContext *funcctx;
1444 : 232 : List *sequences = NIL;
1445 : :
1446 : : /* stuff done only on the first call of the function */
1447 [ + + ]: 232 : if (SRF_IS_FIRSTCALL())
1448 : : {
1449 : 215 : char *pubname = text_to_cstring(PG_GETARG_TEXT_PP(0));
1450 : : Publication *publication;
1451 : : MemoryContext oldcontext;
1452 : :
1453 : : /* create a function context for cross-call persistence */
1454 : 215 : funcctx = SRF_FIRSTCALL_INIT();
1455 : :
1456 : : /* switch to memory context appropriate for multiple function calls */
1457 : 215 : oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
1458 : :
1459 : 215 : publication = GetPublicationByName(pubname, false);
1460 : :
1461 [ + + ]: 215 : if (publication->allsequences)
11 1462 : 6 : sequences = GetAllPublicationRelations(publication->oid,
1463 : : RELKIND_SEQUENCE,
1464 : : false);
1465 : :
114 peter@eisentraut.org 1466 : 215 : funcctx->user_fctx = sequences;
1467 : :
157 akapila@postgresql.o 1468 : 215 : MemoryContextSwitchTo(oldcontext);
1469 : : }
1470 : :
1471 : : /* stuff done on every call of the function */
1472 : 232 : funcctx = SRF_PERCALL_SETUP();
1473 : 232 : sequences = (List *) funcctx->user_fctx;
1474 : :
1475 [ + + ]: 232 : if (funcctx->call_cntr < list_length(sequences))
1476 : : {
1477 : 17 : Oid relid = list_nth_oid(sequences, funcctx->call_cntr);
1478 : :
1479 : 17 : SRF_RETURN_NEXT(funcctx, ObjectIdGetDatum(relid));
1480 : : }
1481 : :
1482 : 215 : SRF_RETURN_DONE(funcctx);
1483 : : }
|